forked from mirrors/linux
		
	io_uring: add support for link with drain
To support the link with drain, we need to do two parts.
There is an sqes:
    0     1     2     3     4     5     6
 +-----+-----+-----+-----+-----+-----+-----+
 |  N  |  L  |  L  | L+D |  N  |  N  |  N  |
 +-----+-----+-----+-----+-----+-----+-----+
First, we need to ensure that the io before the link is completed,
there is a easy way is set drain flag to the link list's head, so
all subsequent io will be inserted into the defer_list.
	+-----+
    (0) |  N  |
	+-----+
           |          (2)         (3)         (4)
	+-----+     +-----+     +-----+     +-----+
    (1) | L+D | --> |  L  | --> | L+D | --> |  N  |
	+-----+     +-----+     +-----+     +-----+
           |
	+-----+
    (5) |  N  |
	+-----+
           |
	+-----+
    (6) |  N  |
	+-----+
Second, ensure that the following IO will not be completed first,
an easy way is to create a mirror of drain io and insert it into
defer_list, in this way, as long as drain io is not processed, the
following io in the defer_list will not be actively process.
	+-----+
    (0) |  N  |
	+-----+
           |          (2)         (3)         (4)
	+-----+     +-----+     +-----+     +-----+
    (1) | L+D | --> |  L  | --> | L+D | --> |  N  |
	+-----+     +-----+     +-----+     +-----+
           |
	+-----+
   ('3) |  D  |   <== This is a shadow of (3)
	+-----+
           |
	+-----+
    (5) |  N  |
	+-----+
           |
	+-----+
    (6) |  N  |
	+-----+
Signed-off-by: Jackie Liu <liuyun01@kylinos.cn>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
			
			
This commit is contained in:
		
							parent
							
								
									8776f3fa15
								
							
						
					
					
						commit
						4fe2c96315
					
				
					 1 changed files with 97 additions and 17 deletions
				
			
		
							
								
								
									
										114
									
								
								fs/io_uring.c
									
									
									
									
									
								
							
							
						
						
									
										114
									
								
								fs/io_uring.c
									
									
									
									
									
								
							| 
						 | 
					@ -312,6 +312,7 @@ struct io_kiocb {
 | 
				
			||||||
#define REQ_F_LINK		64	/* linked sqes */
 | 
					#define REQ_F_LINK		64	/* linked sqes */
 | 
				
			||||||
#define REQ_F_LINK_DONE		128	/* linked sqes done */
 | 
					#define REQ_F_LINK_DONE		128	/* linked sqes done */
 | 
				
			||||||
#define REQ_F_FAIL_LINK		256	/* fail rest of links */
 | 
					#define REQ_F_FAIL_LINK		256	/* fail rest of links */
 | 
				
			||||||
 | 
					#define REQ_F_SHADOW_DRAIN	512	/* link-drain shadow req */
 | 
				
			||||||
	u64			user_data;
 | 
						u64			user_data;
 | 
				
			||||||
	u32			result;
 | 
						u32			result;
 | 
				
			||||||
	u32			sequence;
 | 
						u32			sequence;
 | 
				
			||||||
| 
						 | 
					@ -343,6 +344,7 @@ struct io_submit_state {
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void io_sq_wq_submit_work(struct work_struct *work);
 | 
					static void io_sq_wq_submit_work(struct work_struct *work);
 | 
				
			||||||
 | 
					static void __io_free_req(struct io_kiocb *req);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static struct kmem_cache *req_cachep;
 | 
					static struct kmem_cache *req_cachep;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -448,6 +450,11 @@ static void io_commit_cqring(struct io_ring_ctx *ctx)
 | 
				
			||||||
	__io_commit_cqring(ctx);
 | 
						__io_commit_cqring(ctx);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	while ((req = io_get_deferred_req(ctx)) != NULL) {
 | 
						while ((req = io_get_deferred_req(ctx)) != NULL) {
 | 
				
			||||||
 | 
							if (req->flags & REQ_F_SHADOW_DRAIN) {
 | 
				
			||||||
 | 
								/* Just for drain, free it. */
 | 
				
			||||||
 | 
								__io_free_req(req);
 | 
				
			||||||
 | 
								continue;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		req->flags |= REQ_F_IO_DRAINED;
 | 
							req->flags |= REQ_F_IO_DRAINED;
 | 
				
			||||||
		queue_work(ctx->sqo_wq, &req->work);
 | 
							queue_work(ctx->sqo_wq, &req->work);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -2015,10 +2022,14 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
 | 
				
			||||||
	flags = READ_ONCE(s->sqe->flags);
 | 
						flags = READ_ONCE(s->sqe->flags);
 | 
				
			||||||
	fd = READ_ONCE(s->sqe->fd);
 | 
						fd = READ_ONCE(s->sqe->fd);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (flags & IOSQE_IO_DRAIN) {
 | 
						if (flags & IOSQE_IO_DRAIN)
 | 
				
			||||||
		req->flags |= REQ_F_IO_DRAIN;
 | 
							req->flags |= REQ_F_IO_DRAIN;
 | 
				
			||||||
		req->sequence = s->sequence;
 | 
						/*
 | 
				
			||||||
	}
 | 
						 * All io need record the previous position, if LINK vs DARIN,
 | 
				
			||||||
 | 
						 * it can be used to mark the position of the first IO in the
 | 
				
			||||||
 | 
						 * link list.
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						req->sequence = s->sequence;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (!io_op_needs_file(s->sqe))
 | 
						if (!io_op_needs_file(s->sqe))
 | 
				
			||||||
		return 0;
 | 
							return 0;
 | 
				
			||||||
| 
						 | 
					@ -2040,20 +2051,11 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
 | 
				
			||||||
	return 0;
 | 
						return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 | 
					static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 | 
				
			||||||
			struct sqe_submit *s)
 | 
								struct sqe_submit *s)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	int ret;
 | 
						int ret;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ret = io_req_defer(ctx, req, s->sqe);
 | 
					 | 
				
			||||||
	if (ret) {
 | 
					 | 
				
			||||||
		if (ret != -EIOCBQUEUED) {
 | 
					 | 
				
			||||||
			io_free_req(req);
 | 
					 | 
				
			||||||
			io_cqring_add_event(ctx, s->sqe->user_data, ret);
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return 0;
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	ret = __io_submit_sqe(ctx, req, s, true);
 | 
						ret = __io_submit_sqe(ctx, req, s, true);
 | 
				
			||||||
	if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
 | 
						if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
 | 
				
			||||||
		struct io_uring_sqe *sqe_copy;
 | 
							struct io_uring_sqe *sqe_copy;
 | 
				
			||||||
| 
						 | 
					@ -2096,6 +2098,64 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 | 
				
			||||||
	return ret;
 | 
						return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 | 
				
			||||||
 | 
								struct sqe_submit *s)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						int ret;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ret = io_req_defer(ctx, req, s->sqe);
 | 
				
			||||||
 | 
						if (ret) {
 | 
				
			||||||
 | 
							if (ret != -EIOCBQUEUED) {
 | 
				
			||||||
 | 
								io_free_req(req);
 | 
				
			||||||
 | 
								io_cqring_add_event(ctx, s->sqe->user_data, ret);
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return 0;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return __io_queue_sqe(ctx, req, s);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
 | 
				
			||||||
 | 
								      struct sqe_submit *s, struct io_kiocb *shadow)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						int ret;
 | 
				
			||||||
 | 
						int need_submit = false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (!shadow)
 | 
				
			||||||
 | 
							return io_queue_sqe(ctx, req, s);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/*
 | 
				
			||||||
 | 
						 * Mark the first IO in link list as DRAIN, let all the following
 | 
				
			||||||
 | 
						 * IOs enter the defer list. all IO needs to be completed before link
 | 
				
			||||||
 | 
						 * list.
 | 
				
			||||||
 | 
						 */
 | 
				
			||||||
 | 
						req->flags |= REQ_F_IO_DRAIN;
 | 
				
			||||||
 | 
						ret = io_req_defer(ctx, req, s->sqe);
 | 
				
			||||||
 | 
						if (ret) {
 | 
				
			||||||
 | 
							if (ret != -EIOCBQUEUED) {
 | 
				
			||||||
 | 
								io_free_req(req);
 | 
				
			||||||
 | 
								io_cqring_add_event(ctx, s->sqe->user_data, ret);
 | 
				
			||||||
 | 
								return 0;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							/*
 | 
				
			||||||
 | 
							 * If ret == 0 means that all IOs in front of link io are
 | 
				
			||||||
 | 
							 * running done. let's queue link head.
 | 
				
			||||||
 | 
							 */
 | 
				
			||||||
 | 
							need_submit = true;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/* Insert shadow req to defer_list, blocking next IOs */
 | 
				
			||||||
 | 
						spin_lock_irq(&ctx->completion_lock);
 | 
				
			||||||
 | 
						list_add_tail(&shadow->list, &ctx->defer_list);
 | 
				
			||||||
 | 
						spin_unlock_irq(&ctx->completion_lock);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (need_submit)
 | 
				
			||||||
 | 
							return __io_queue_sqe(ctx, req, s);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return 0;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#define SQE_VALID_FLAGS	(IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
 | 
					#define SQE_VALID_FLAGS	(IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
 | 
					static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
 | 
				
			||||||
| 
						 | 
					@ -2241,6 +2301,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct io_submit_state state, *statep = NULL;
 | 
						struct io_submit_state state, *statep = NULL;
 | 
				
			||||||
	struct io_kiocb *link = NULL;
 | 
						struct io_kiocb *link = NULL;
 | 
				
			||||||
 | 
						struct io_kiocb *shadow_req = NULL;
 | 
				
			||||||
	bool prev_was_link = false;
 | 
						bool prev_was_link = false;
 | 
				
			||||||
	int i, submitted = 0;
 | 
						int i, submitted = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2255,11 +2316,20 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 | 
				
			||||||
		 * that's the end of the chain. Submit the previous link.
 | 
							 * that's the end of the chain. Submit the previous link.
 | 
				
			||||||
		 */
 | 
							 */
 | 
				
			||||||
		if (!prev_was_link && link) {
 | 
							if (!prev_was_link && link) {
 | 
				
			||||||
			io_queue_sqe(ctx, link, &link->submit);
 | 
								io_queue_link_head(ctx, link, &link->submit, shadow_req);
 | 
				
			||||||
			link = NULL;
 | 
								link = NULL;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
 | 
							prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) {
 | 
				
			||||||
 | 
								if (!shadow_req) {
 | 
				
			||||||
 | 
									shadow_req = io_get_req(ctx, NULL);
 | 
				
			||||||
 | 
									shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
 | 
				
			||||||
 | 
									refcount_dec(&shadow_req->refs);
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								shadow_req->sequence = sqes[i].sequence;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if (unlikely(mm_fault)) {
 | 
							if (unlikely(mm_fault)) {
 | 
				
			||||||
			io_cqring_add_event(ctx, sqes[i].sqe->user_data,
 | 
								io_cqring_add_event(ctx, sqes[i].sqe->user_data,
 | 
				
			||||||
						-EFAULT);
 | 
											-EFAULT);
 | 
				
			||||||
| 
						 | 
					@ -2273,7 +2343,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (link)
 | 
						if (link)
 | 
				
			||||||
		io_queue_sqe(ctx, link, &link->submit);
 | 
							io_queue_link_head(ctx, link, &link->submit, shadow_req);
 | 
				
			||||||
	if (statep)
 | 
						if (statep)
 | 
				
			||||||
		io_submit_state_end(&state);
 | 
							io_submit_state_end(&state);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2409,6 +2479,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct io_submit_state state, *statep = NULL;
 | 
						struct io_submit_state state, *statep = NULL;
 | 
				
			||||||
	struct io_kiocb *link = NULL;
 | 
						struct io_kiocb *link = NULL;
 | 
				
			||||||
 | 
						struct io_kiocb *shadow_req = NULL;
 | 
				
			||||||
	bool prev_was_link = false;
 | 
						bool prev_was_link = false;
 | 
				
			||||||
	int i, submit = 0;
 | 
						int i, submit = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2428,11 +2499,20 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
 | 
				
			||||||
		 * that's the end of the chain. Submit the previous link.
 | 
							 * that's the end of the chain. Submit the previous link.
 | 
				
			||||||
		 */
 | 
							 */
 | 
				
			||||||
		if (!prev_was_link && link) {
 | 
							if (!prev_was_link && link) {
 | 
				
			||||||
			io_queue_sqe(ctx, link, &link->submit);
 | 
								io_queue_link_head(ctx, link, &link->submit, shadow_req);
 | 
				
			||||||
			link = NULL;
 | 
								link = NULL;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
 | 
							prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
 | 
				
			||||||
 | 
								if (!shadow_req) {
 | 
				
			||||||
 | 
									shadow_req = io_get_req(ctx, NULL);
 | 
				
			||||||
 | 
									shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
 | 
				
			||||||
 | 
									refcount_dec(&shadow_req->refs);
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								shadow_req->sequence = s.sequence;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		s.has_user = true;
 | 
							s.has_user = true;
 | 
				
			||||||
		s.needs_lock = false;
 | 
							s.needs_lock = false;
 | 
				
			||||||
		s.needs_fixed_file = false;
 | 
							s.needs_fixed_file = false;
 | 
				
			||||||
| 
						 | 
					@ -2442,7 +2522,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
 | 
				
			||||||
	io_commit_sqring(ctx);
 | 
						io_commit_sqring(ctx);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (link)
 | 
						if (link)
 | 
				
			||||||
		io_queue_sqe(ctx, link, &link->submit);
 | 
							io_queue_link_head(ctx, link, &link->submit, shadow_req);
 | 
				
			||||||
	if (statep)
 | 
						if (statep)
 | 
				
			||||||
		io_submit_state_end(statep);
 | 
							io_submit_state_end(statep);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in a new issue