mirror of
				https://github.com/torvalds/linux.git
				synced 2025-11-01 00:58:39 +02:00 
			
		
		
		
	io_uring/sqpoll: manage task_work privately
Decouple from task_work running, and cap the number of entries we process at the time. If we exceed that number, push remaining entries to a retry list that we'll process first next time. We cap the number of entries to process at 8, which is fairly random. We just want to get enough per-ctx batching here, while not processing endlessly. Since we manually run PF_IO_WORKER related task_work anyway as the task never exits to userspace, with this we no longer need to add an actual task_work item to the per-process list. Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
		
							parent
							
								
									2708af1adc
								
							
						
					
					
						commit
						af5d68f889
					
				
					 3 changed files with 82 additions and 17 deletions
				
			
		|  | @ -1173,7 +1173,14 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts) | ||||||
| 	percpu_ref_put(&ctx->refs); | 	percpu_ref_put(&ctx->refs); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| static void handle_tw_list(struct llist_node *node, unsigned int *count) | /*
 | ||||||
|  |  * Run queued task_work, returning the number of entries processed in *count. | ||||||
|  |  * If more entries than max_entries are available, stop processing once this | ||||||
|  |  * is reached and return the rest of the list. | ||||||
|  |  */ | ||||||
|  | struct llist_node *io_handle_tw_list(struct llist_node *node, | ||||||
|  | 				     unsigned int *count, | ||||||
|  | 				     unsigned int max_entries) | ||||||
| { | { | ||||||
| 	struct io_ring_ctx *ctx = NULL; | 	struct io_ring_ctx *ctx = NULL; | ||||||
| 	struct io_tw_state ts = { }; | 	struct io_tw_state ts = { }; | ||||||
|  | @ -1200,9 +1207,10 @@ static void handle_tw_list(struct llist_node *node, unsigned int *count) | ||||||
| 			ctx = NULL; | 			ctx = NULL; | ||||||
| 			cond_resched(); | 			cond_resched(); | ||||||
| 		} | 		} | ||||||
| 	} while (node); | 	} while (node && *count < max_entries); | ||||||
| 
 | 
 | ||||||
| 	ctx_flush_and_put(ctx, &ts); | 	ctx_flush_and_put(ctx, &ts); | ||||||
|  | 	return node; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /**
 | /**
 | ||||||
|  | @ -1247,27 +1255,41 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void tctx_task_work(struct callback_head *cb) | struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, | ||||||
|  | 				      unsigned int max_entries, | ||||||
|  | 				      unsigned int *count) | ||||||
| { | { | ||||||
| 	struct io_uring_task *tctx = container_of(cb, struct io_uring_task, |  | ||||||
| 						  task_work); |  | ||||||
| 	struct llist_node *node; | 	struct llist_node *node; | ||||||
| 	unsigned int count = 0; |  | ||||||
| 
 | 
 | ||||||
| 	if (unlikely(current->flags & PF_EXITING)) { | 	if (unlikely(current->flags & PF_EXITING)) { | ||||||
| 		io_fallback_tw(tctx, true); | 		io_fallback_tw(tctx, true); | ||||||
| 		return; | 		return NULL; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	node = llist_del_all(&tctx->task_list); | 	node = llist_del_all(&tctx->task_list); | ||||||
| 	if (node) | 	if (node) { | ||||||
| 		handle_tw_list(llist_reverse_order(node), &count); | 		node = llist_reverse_order(node); | ||||||
|  | 		node = io_handle_tw_list(node, count, max_entries); | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 	/* relaxed read is enough as only the task itself sets ->in_cancel */ | 	/* relaxed read is enough as only the task itself sets ->in_cancel */ | ||||||
| 	if (unlikely(atomic_read(&tctx->in_cancel))) | 	if (unlikely(atomic_read(&tctx->in_cancel))) | ||||||
| 		io_uring_drop_tctx_refs(current); | 		io_uring_drop_tctx_refs(current); | ||||||
| 
 | 
 | ||||||
| 	trace_io_uring_task_work_run(tctx, count); | 	trace_io_uring_task_work_run(tctx, *count); | ||||||
|  | 	return node; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | void tctx_task_work(struct callback_head *cb) | ||||||
|  | { | ||||||
|  | 	struct io_uring_task *tctx; | ||||||
|  | 	struct llist_node *ret; | ||||||
|  | 	unsigned int count = 0; | ||||||
|  | 
 | ||||||
|  | 	tctx = container_of(cb, struct io_uring_task, task_work); | ||||||
|  | 	ret = tctx_task_work_run(tctx, UINT_MAX, &count); | ||||||
|  | 	/* can't happen */ | ||||||
|  | 	WARN_ON_ONCE(ret); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags) | static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags) | ||||||
|  | @ -1350,6 +1372,10 @@ static void io_req_normal_work_add(struct io_kiocb *req) | ||||||
| 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) | 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) | ||||||
| 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); | 		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); | ||||||
| 
 | 
 | ||||||
|  | 	/* SQPOLL doesn't need the task_work added, it'll run it itself */ | ||||||
|  | 	if (ctx->flags & IORING_SETUP_SQPOLL) | ||||||
|  | 		return; | ||||||
|  | 
 | ||||||
| 	if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method))) | 	if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method))) | ||||||
| 		return; | 		return; | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -57,6 +57,8 @@ void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use); | ||||||
| void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts); | void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts); | ||||||
| void io_req_task_queue_fail(struct io_kiocb *req, int ret); | void io_req_task_queue_fail(struct io_kiocb *req, int ret); | ||||||
| void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts); | void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts); | ||||||
|  | struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries); | ||||||
|  | struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count); | ||||||
| void tctx_task_work(struct callback_head *cb); | void tctx_task_work(struct callback_head *cb); | ||||||
| __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd); | __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd); | ||||||
| int io_uring_alloc_task_context(struct task_struct *task, | int io_uring_alloc_task_context(struct task_struct *task, | ||||||
|  | @ -275,6 +277,8 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) | ||||||
| 
 | 
 | ||||||
| static inline int io_run_task_work(void) | static inline int io_run_task_work(void) | ||||||
| { | { | ||||||
|  | 	bool ret = false; | ||||||
|  | 
 | ||||||
| 	/*
 | 	/*
 | ||||||
| 	 * Always check-and-clear the task_work notification signal. With how | 	 * Always check-and-clear the task_work notification signal. With how | ||||||
| 	 * signaling works for task_work, we can find it set with nothing to | 	 * signaling works for task_work, we can find it set with nothing to | ||||||
|  | @ -286,18 +290,26 @@ static inline int io_run_task_work(void) | ||||||
| 	 * PF_IO_WORKER never returns to userspace, so check here if we have | 	 * PF_IO_WORKER never returns to userspace, so check here if we have | ||||||
| 	 * notify work that needs processing. | 	 * notify work that needs processing. | ||||||
| 	 */ | 	 */ | ||||||
| 	if (current->flags & PF_IO_WORKER && | 	if (current->flags & PF_IO_WORKER) { | ||||||
| 	    test_thread_flag(TIF_NOTIFY_RESUME)) { | 		if (test_thread_flag(TIF_NOTIFY_RESUME)) { | ||||||
| 		__set_current_state(TASK_RUNNING); | 			__set_current_state(TASK_RUNNING); | ||||||
| 		resume_user_mode_work(NULL); | 			resume_user_mode_work(NULL); | ||||||
|  | 		} | ||||||
|  | 		if (current->io_uring) { | ||||||
|  | 			unsigned int count = 0; | ||||||
|  | 
 | ||||||
|  | 			tctx_task_work_run(current->io_uring, UINT_MAX, &count); | ||||||
|  | 			if (count) | ||||||
|  | 				ret = true; | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	if (task_work_pending(current)) { | 	if (task_work_pending(current)) { | ||||||
| 		__set_current_state(TASK_RUNNING); | 		__set_current_state(TASK_RUNNING); | ||||||
| 		task_work_run(); | 		task_work_run(); | ||||||
| 		return 1; | 		ret = true; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return 0; | 	return ret; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| static inline bool io_task_work_pending(struct io_ring_ctx *ctx) | static inline bool io_task_work_pending(struct io_ring_ctx *ctx) | ||||||
|  |  | ||||||
|  | @ -18,6 +18,7 @@ | ||||||
| #include "sqpoll.h" | #include "sqpoll.h" | ||||||
| 
 | 
 | ||||||
| #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8 | #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8 | ||||||
|  | #define IORING_TW_CAP_ENTRIES_VALUE	8 | ||||||
| 
 | 
 | ||||||
| enum { | enum { | ||||||
| 	IO_SQ_THREAD_SHOULD_STOP = 0, | 	IO_SQ_THREAD_SHOULD_STOP = 0, | ||||||
|  | @ -219,8 +220,31 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd) | ||||||
| 	return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); | 	return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | /*
 | ||||||
|  |  * Run task_work, processing the retry_list first. The retry_list holds | ||||||
|  |  * entries that we passed on in the previous run, if we had more task_work | ||||||
|  |  * than we were asked to process. Newly queued task_work isn't run until the | ||||||
|  |  * retry list has been fully processed. | ||||||
|  |  */ | ||||||
|  | static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries) | ||||||
|  | { | ||||||
|  | 	struct io_uring_task *tctx = current->io_uring; | ||||||
|  | 	unsigned int count = 0; | ||||||
|  | 
 | ||||||
|  | 	if (*retry_list) { | ||||||
|  | 		*retry_list = io_handle_tw_list(*retry_list, &count, max_entries); | ||||||
|  | 		if (count >= max_entries) | ||||||
|  | 			return count; | ||||||
|  | 		max_entries -= count; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	*retry_list = tctx_task_work_run(tctx, max_entries, &count); | ||||||
|  | 	return count; | ||||||
|  | } | ||||||
|  | 
 | ||||||
| static int io_sq_thread(void *data) | static int io_sq_thread(void *data) | ||||||
| { | { | ||||||
|  | 	struct llist_node *retry_list = NULL; | ||||||
| 	struct io_sq_data *sqd = data; | 	struct io_sq_data *sqd = data; | ||||||
| 	struct io_ring_ctx *ctx; | 	struct io_ring_ctx *ctx; | ||||||
| 	unsigned long timeout = 0; | 	unsigned long timeout = 0; | ||||||
|  | @ -257,7 +281,7 @@ static int io_sq_thread(void *data) | ||||||
| 			if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list))) | 			if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list))) | ||||||
| 				sqt_spin = true; | 				sqt_spin = true; | ||||||
| 		} | 		} | ||||||
| 		if (io_run_task_work()) | 		if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE)) | ||||||
| 			sqt_spin = true; | 			sqt_spin = true; | ||||||
| 
 | 
 | ||||||
| 		if (sqt_spin || !time_after(jiffies, timeout)) { | 		if (sqt_spin || !time_after(jiffies, timeout)) { | ||||||
|  | @ -312,6 +336,9 @@ static int io_sq_thread(void *data) | ||||||
| 		timeout = jiffies + sqd->sq_thread_idle; | 		timeout = jiffies + sqd->sq_thread_idle; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if (retry_list) | ||||||
|  | 		io_sq_tw(&retry_list, UINT_MAX); | ||||||
|  | 
 | ||||||
| 	io_uring_cancel_generic(true, sqd); | 	io_uring_cancel_generic(true, sqd); | ||||||
| 	sqd->thread = NULL; | 	sqd->thread = NULL; | ||||||
| 	list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) | 	list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue
	
	 Jens Axboe
						Jens Axboe