forked from mirrors/linux
		
	io_uring: limit local tw done
Instead of eagerly running all available local tw, limit the amount of local tw done to the max of IO_LOCAL_TW_DEFAULT_MAX (20) or wait_nr. The value of 20 is chosen as a reasonable heuristic to allow enough work batching but also keep latency down. Add a retry_llist that maintains a list of local tw that couldn't be done in time. No synchronisation is needed since it is only modified within the task context. Signed-off-by: David Wei <dw@davidwei.uk> Link: https://lore.kernel.org/r/20241120221452.3762588-3-dw@davidwei.uk Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
		
							parent
							
								
									40cfe55324
								
							
						
					
					
						commit
						f46b9cdb22
					
				
					 3 changed files with 34 additions and 12 deletions
				
			
		| 
						 | 
				
			
			@ -335,6 +335,7 @@ struct io_ring_ctx {
 | 
			
		|||
	 */
 | 
			
		||||
	struct {
 | 
			
		||||
		struct llist_head	work_llist;
 | 
			
		||||
		struct llist_head	retry_llist;
 | 
			
		||||
		unsigned long		check_cq;
 | 
			
		||||
		atomic_t		cq_wait_nr;
 | 
			
		||||
		atomic_t		cq_timeouts;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -122,6 +122,7 @@
 | 
			
		|||
 | 
			
		||||
#define IO_COMPL_BATCH			32
 | 
			
		||||
#define IO_REQ_ALLOC_BATCH		8
 | 
			
		||||
#define IO_LOCAL_TW_DEFAULT_MAX		20
 | 
			
		||||
 | 
			
		||||
struct io_defer_entry {
 | 
			
		||||
	struct list_head	list;
 | 
			
		||||
| 
						 | 
				
			
			@ -1256,6 +1257,8 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
 | 
			
		|||
	struct llist_node *node = llist_del_all(&ctx->work_llist);
 | 
			
		||||
 | 
			
		||||
	__io_fallback_tw(node, false);
 | 
			
		||||
	node = llist_del_all(&ctx->retry_llist);
 | 
			
		||||
	__io_fallback_tw(node, false);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
 | 
			
		||||
| 
						 | 
				
			
			@ -1270,37 +1273,55 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
 | 
			
		|||
	return false;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int __io_run_local_work_loop(struct llist_node **node,
 | 
			
		||||
				    struct io_tw_state *ts,
 | 
			
		||||
				    int events)
 | 
			
		||||
{
 | 
			
		||||
	while (*node) {
 | 
			
		||||
		struct llist_node *next = (*node)->next;
 | 
			
		||||
		struct io_kiocb *req = container_of(*node, struct io_kiocb,
 | 
			
		||||
						    io_task_work.node);
 | 
			
		||||
		INDIRECT_CALL_2(req->io_task_work.func,
 | 
			
		||||
				io_poll_task_func, io_req_rw_complete,
 | 
			
		||||
				req, ts);
 | 
			
		||||
		*node = next;
 | 
			
		||||
		if (--events <= 0)
 | 
			
		||||
			break;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return events;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
 | 
			
		||||
			       int min_events)
 | 
			
		||||
{
 | 
			
		||||
	struct llist_node *node;
 | 
			
		||||
	unsigned int loops = 0;
 | 
			
		||||
	int ret = 0;
 | 
			
		||||
	int ret, limit;
 | 
			
		||||
 | 
			
		||||
	if (WARN_ON_ONCE(ctx->submitter_task != current))
 | 
			
		||||
		return -EEXIST;
 | 
			
		||||
	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
 | 
			
		||||
		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
 | 
			
		||||
	limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
 | 
			
		||||
again:
 | 
			
		||||
	ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit);
 | 
			
		||||
	if (ctx->retry_llist.first)
 | 
			
		||||
		goto retry_done;
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
	 * llists are in reverse order, flip it back the right way before
 | 
			
		||||
	 * running the pending items.
 | 
			
		||||
	 */
 | 
			
		||||
	node = llist_reverse_order(llist_del_all(&ctx->work_llist));
 | 
			
		||||
	while (node) {
 | 
			
		||||
		struct llist_node *next = node->next;
 | 
			
		||||
		struct io_kiocb *req = container_of(node, struct io_kiocb,
 | 
			
		||||
						    io_task_work.node);
 | 
			
		||||
		INDIRECT_CALL_2(req->io_task_work.func,
 | 
			
		||||
				io_poll_task_func, io_req_rw_complete,
 | 
			
		||||
				req, ts);
 | 
			
		||||
		ret++;
 | 
			
		||||
		node = next;
 | 
			
		||||
	}
 | 
			
		||||
	ret = __io_run_local_work_loop(&node, ts, ret);
 | 
			
		||||
	ctx->retry_llist.first = node;
 | 
			
		||||
	loops++;
 | 
			
		||||
 | 
			
		||||
	ret = limit - ret;
 | 
			
		||||
	if (io_run_local_work_continue(ctx, ret, min_events))
 | 
			
		||||
		goto again;
 | 
			
		||||
retry_done:
 | 
			
		||||
	io_submit_flush_completions(ctx);
 | 
			
		||||
	if (io_run_local_work_continue(ctx, ret, min_events))
 | 
			
		||||
		goto again;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -349,7 +349,7 @@ static inline int io_run_task_work(void)
 | 
			
		|||
 | 
			
		||||
static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
 | 
			
		||||
{
 | 
			
		||||
	return !llist_empty(&ctx->work_llist);
 | 
			
		||||
	return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in a new issue