mirror of
				https://github.com/torvalds/linux.git
				synced 2025-11-04 10:40:15 +02:00 
			
		
		
		
	io_uring: support true async buffered reads, if file provides it
If the file is flagged with FMODE_BUF_RASYNC, then we don't have to punt the buffered read to an io-wq worker. Instead we can rely on page unlocking callbacks to support retry based async IO. This is a lot more efficient than doing async thread offload. The retry is done similarly to how we handle poll based retry. From the unlock callback, we simply queue the retry to a task_work based handler. Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
		
							parent
							
								
									d1932dc3dc
								
							
						
					
					
						commit
						bcf5a06304
					
				
					 1 changed files with 135 additions and 4 deletions
				
			
		
							
								
								
									
										139
									
								
								fs/io_uring.c
									
									
									
									
									
								
							
							
						
						
									
										139
									
								
								fs/io_uring.c
									
									
									
									
									
								
							| 
						 | 
					@ -78,6 +78,7 @@
 | 
				
			||||||
#include <linux/fs_struct.h>
 | 
					#include <linux/fs_struct.h>
 | 
				
			||||||
#include <linux/splice.h>
 | 
					#include <linux/splice.h>
 | 
				
			||||||
#include <linux/task_work.h>
 | 
					#include <linux/task_work.h>
 | 
				
			||||||
 | 
					#include <linux/pagemap.h>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#define CREATE_TRACE_POINTS
 | 
					#define CREATE_TRACE_POINTS
 | 
				
			||||||
#include <trace/events/io_uring.h>
 | 
					#include <trace/events/io_uring.h>
 | 
				
			||||||
| 
						 | 
					@ -503,6 +504,8 @@ struct io_async_rw {
 | 
				
			||||||
	struct iovec			*iov;
 | 
						struct iovec			*iov;
 | 
				
			||||||
	ssize_t				nr_segs;
 | 
						ssize_t				nr_segs;
 | 
				
			||||||
	ssize_t				size;
 | 
						ssize_t				size;
 | 
				
			||||||
 | 
						struct wait_page_queue		wpq;
 | 
				
			||||||
 | 
						struct callback_head		task_work;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct io_async_ctx {
 | 
					struct io_async_ctx {
 | 
				
			||||||
| 
						 | 
					@ -2750,6 +2753,126 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 | 
				
			||||||
	return 0;
 | 
						return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void __io_async_buf_error(struct io_kiocb *req, int error)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						struct io_ring_ctx *ctx = req->ctx;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						spin_lock_irq(&ctx->completion_lock);
 | 
				
			||||||
 | 
						io_cqring_fill_event(req, error);
 | 
				
			||||||
 | 
						io_commit_cqring(ctx);
 | 
				
			||||||
 | 
						spin_unlock_irq(&ctx->completion_lock);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						io_cqring_ev_posted(ctx);
 | 
				
			||||||
 | 
						req_set_fail_links(req);
 | 
				
			||||||
 | 
						io_double_put_req(req);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void io_async_buf_cancel(struct callback_head *cb)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						struct io_async_rw *rw;
 | 
				
			||||||
 | 
						struct io_kiocb *req;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						rw = container_of(cb, struct io_async_rw, task_work);
 | 
				
			||||||
 | 
						req = rw->wpq.wait.private;
 | 
				
			||||||
 | 
						__io_async_buf_error(req, -ECANCELED);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void io_async_buf_retry(struct callback_head *cb)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						struct io_async_rw *rw;
 | 
				
			||||||
 | 
						struct io_ring_ctx *ctx;
 | 
				
			||||||
 | 
						struct io_kiocb *req;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						rw = container_of(cb, struct io_async_rw, task_work);
 | 
				
			||||||
 | 
						req = rw->wpq.wait.private;
 | 
				
			||||||
 | 
						ctx = req->ctx;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						__set_current_state(TASK_RUNNING);
 | 
				
			||||||
 | 
						if (!io_sq_thread_acquire_mm(ctx, req)) {
 | 
				
			||||||
 | 
							mutex_lock(&ctx->uring_lock);
 | 
				
			||||||
 | 
							__io_queue_sqe(req, NULL);
 | 
				
			||||||
 | 
							mutex_unlock(&ctx->uring_lock);
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							__io_async_buf_error(req, -EFAULT);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
 | 
				
			||||||
 | 
								     int sync, void *arg)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						struct wait_page_queue *wpq;
 | 
				
			||||||
 | 
						struct io_kiocb *req = wait->private;
 | 
				
			||||||
 | 
						struct io_async_rw *rw = &req->io->rw;
 | 
				
			||||||
 | 
						struct wait_page_key *key = arg;
 | 
				
			||||||
 | 
						struct task_struct *tsk;
 | 
				
			||||||
 | 
						int ret;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						wpq = container_of(wait, struct wait_page_queue, wait);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ret = wake_page_match(wpq, key);
 | 
				
			||||||
 | 
						if (ret != 1)
 | 
				
			||||||
 | 
							return ret;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						list_del_init(&wait->entry);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						init_task_work(&rw->task_work, io_async_buf_retry);
 | 
				
			||||||
 | 
						/* submit ref gets dropped, acquire a new one */
 | 
				
			||||||
 | 
						refcount_inc(&req->refs);
 | 
				
			||||||
 | 
						tsk = req->task;
 | 
				
			||||||
 | 
						ret = task_work_add(tsk, &rw->task_work, true);
 | 
				
			||||||
 | 
						if (unlikely(ret)) {
 | 
				
			||||||
 | 
							/* queue just for cancelation */
 | 
				
			||||||
 | 
							init_task_work(&rw->task_work, io_async_buf_cancel);
 | 
				
			||||||
 | 
							tsk = io_wq_get_task(req->ctx->io_wq);
 | 
				
			||||||
 | 
							task_work_add(tsk, &rw->task_work, true);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						wake_up_process(tsk);
 | 
				
			||||||
 | 
						return 1;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static bool io_rw_should_retry(struct io_kiocb *req)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						struct kiocb *kiocb = &req->rw.kiocb;
 | 
				
			||||||
 | 
						int ret;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/* never retry for NOWAIT, we just complete with -EAGAIN */
 | 
				
			||||||
 | 
						if (req->flags & REQ_F_NOWAIT)
 | 
				
			||||||
 | 
							return false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/* already tried, or we're doing O_DIRECT */
 | 
				
			||||||
 | 
						if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ))
 | 
				
			||||||
 | 
							return false;
 | 
				
			||||||
 | 
						/*
 | 
				
			||||||
 | 
						 * just use poll if we can, and don't attempt if the fs doesn't
 | 
				
			||||||
 | 
						 * support callback based unlocks
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
 | 
				
			||||||
 | 
							return false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/*
 | 
				
			||||||
 | 
						 * If request type doesn't require req->io to defer in general,
 | 
				
			||||||
 | 
						 * we need to allocate it here
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						if (!req->io && __io_alloc_async_ctx(req))
 | 
				
			||||||
 | 
							return false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq,
 | 
				
			||||||
 | 
											io_async_buf_func, req);
 | 
				
			||||||
 | 
						if (!ret) {
 | 
				
			||||||
 | 
							io_get_req_task(req);
 | 
				
			||||||
 | 
							return true;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return false;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						if (req->file->f_op->read_iter)
 | 
				
			||||||
 | 
							return call_read_iter(req->file, &req->rw.kiocb, iter);
 | 
				
			||||||
 | 
						return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static int io_read(struct io_kiocb *req, bool force_nonblock)
 | 
					static int io_read(struct io_kiocb *req, bool force_nonblock)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
 | 
						struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
 | 
				
			||||||
| 
						 | 
					@ -2784,10 +2907,7 @@ static int io_read(struct io_kiocb *req, bool force_nonblock)
 | 
				
			||||||
		unsigned long nr_segs = iter.nr_segs;
 | 
							unsigned long nr_segs = iter.nr_segs;
 | 
				
			||||||
		ssize_t ret2 = 0;
 | 
							ssize_t ret2 = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if (req->file->f_op->read_iter)
 | 
							ret2 = io_iter_do_read(req, &iter);
 | 
				
			||||||
			ret2 = call_read_iter(req->file, kiocb, &iter);
 | 
					 | 
				
			||||||
		else
 | 
					 | 
				
			||||||
			ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		/* Catch -EAGAIN return for forced non-blocking submission */
 | 
							/* Catch -EAGAIN return for forced non-blocking submission */
 | 
				
			||||||
		if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) {
 | 
							if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) {
 | 
				
			||||||
| 
						 | 
					@ -2804,6 +2924,17 @@ static int io_read(struct io_kiocb *req, bool force_nonblock)
 | 
				
			||||||
			if (!(req->flags & REQ_F_NOWAIT) &&
 | 
								if (!(req->flags & REQ_F_NOWAIT) &&
 | 
				
			||||||
			    !file_can_poll(req->file))
 | 
								    !file_can_poll(req->file))
 | 
				
			||||||
				req->flags |= REQ_F_MUST_PUNT;
 | 
									req->flags |= REQ_F_MUST_PUNT;
 | 
				
			||||||
 | 
								/* if we can retry, do so with the callbacks armed */
 | 
				
			||||||
 | 
								if (io_rw_should_retry(req)) {
 | 
				
			||||||
 | 
									ret2 = io_iter_do_read(req, &iter);
 | 
				
			||||||
 | 
									if (ret2 == -EIOCBQUEUED) {
 | 
				
			||||||
 | 
										goto out_free;
 | 
				
			||||||
 | 
									} else if (ret2 != -EAGAIN) {
 | 
				
			||||||
 | 
										kiocb_done(kiocb, ret2);
 | 
				
			||||||
 | 
										goto out_free;
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								kiocb->ki_flags &= ~IOCB_WAITQ;
 | 
				
			||||||
			return -EAGAIN;
 | 
								return -EAGAIN;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in a new issue