forked from mirrors/linux
		
	xprtrdma: Use xprt_pin_rqst in rpcrdma_reply_handler
Adopt the use of xprt_pin_rqst to eliminate contention between
Call-side users of rb_lock and the use of rb_lock in
rpcrdma_reply_handler.
This replaces the mechanism introduced in 431af645cf ("xprtrdma:
Fix client lock-up after application signal fires").
Use recv_lock to quickly find the completing rqst, pin it, then
drop the lock. At that point invalidation and pull-up of the Reply
XDR can be done. Both are often expensive operations.
Finally, take recv_lock again to signal completion to the RPC
layer. It also protects adjustment of "cwnd".
This greatly reduces the amount of time a lock is held by the
reply handler. Comparing lock_stat results shows a marked decrease
in contention on rb_lock and recv_lock.
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
[trond.myklebust@primarydata.com: Remove call to rpcrdma_buffer_put() from
   the "out_norqst:" path in rpcrdma_reply_handler.]
Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
			
			
This commit is contained in:
		
							parent
							
								
									f9773b22a2
								
							
						
					
					
						commit
						9590d083c1
					
				
					 5 changed files with 19 additions and 77 deletions
				
			
		| 
						 | 
					@ -855,6 +855,7 @@ void xprt_pin_rqst(struct rpc_rqst *req)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
 | 
						set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					EXPORT_SYMBOL_GPL(xprt_pin_rqst);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
 * xprt_unpin_rqst - Unpin a request on the transport receive list
 | 
					 * xprt_unpin_rqst - Unpin a request on the transport receive list
 | 
				
			||||||
| 
						 | 
					@ -870,6 +871,7 @@ void xprt_unpin_rqst(struct rpc_rqst *req)
 | 
				
			||||||
	if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
 | 
						if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
 | 
				
			||||||
		wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
 | 
							wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
 | 
					static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
 | 
				
			||||||
__must_hold(&req->rq_xprt->recv_lock)
 | 
					__must_hold(&req->rq_xprt->recv_lock)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -781,9 +781,6 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
 | 
				
			||||||
		rtype = rpcrdma_areadch;
 | 
							rtype = rpcrdma_areadch;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	req->rl_xid = rqst->rq_xid;
 | 
					 | 
				
			||||||
	rpcrdma_insert_req(&r_xprt->rx_buf, req);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	/* This implementation supports the following combinations
 | 
						/* This implementation supports the following combinations
 | 
				
			||||||
	 * of chunk lists in one RPC-over-RDMA Call message:
 | 
						 * of chunk lists in one RPC-over-RDMA Call message:
 | 
				
			||||||
	 *
 | 
						 *
 | 
				
			||||||
| 
						 | 
					@ -1226,14 +1223,12 @@ rpcrdma_reply_handler(struct work_struct *work)
 | 
				
			||||||
	struct rpcrdma_rep *rep =
 | 
						struct rpcrdma_rep *rep =
 | 
				
			||||||
			container_of(work, struct rpcrdma_rep, rr_work);
 | 
								container_of(work, struct rpcrdma_rep, rr_work);
 | 
				
			||||||
	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 | 
						struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 | 
				
			||||||
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 | 
					 | 
				
			||||||
	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 | 
						struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 | 
				
			||||||
	struct xdr_stream *xdr = &rep->rr_stream;
 | 
						struct xdr_stream *xdr = &rep->rr_stream;
 | 
				
			||||||
	struct rpcrdma_req *req;
 | 
						struct rpcrdma_req *req;
 | 
				
			||||||
	struct rpc_rqst *rqst;
 | 
						struct rpc_rqst *rqst;
 | 
				
			||||||
	__be32 *p, xid, vers, proc;
 | 
						__be32 *p, xid, vers, proc;
 | 
				
			||||||
	unsigned long cwnd;
 | 
						unsigned long cwnd;
 | 
				
			||||||
	struct list_head mws;
 | 
					 | 
				
			||||||
	int status;
 | 
						int status;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
 | 
						dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
 | 
				
			||||||
| 
						 | 
					@ -1259,21 +1254,14 @@ rpcrdma_reply_handler(struct work_struct *work)
 | 
				
			||||||
	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 | 
						/* Match incoming rpcrdma_rep to an rpcrdma_req to
 | 
				
			||||||
	 * get context for handling any incoming chunks.
 | 
						 * get context for handling any incoming chunks.
 | 
				
			||||||
	 */
 | 
						 */
 | 
				
			||||||
	spin_lock(&buf->rb_lock);
 | 
						spin_lock(&xprt->recv_lock);
 | 
				
			||||||
	req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf, xid);
 | 
						rqst = xprt_lookup_rqst(xprt, xid);
 | 
				
			||||||
	if (!req)
 | 
						if (!rqst)
 | 
				
			||||||
		goto out_nomatch;
 | 
							goto out_norqst;
 | 
				
			||||||
	if (req->rl_reply)
 | 
						xprt_pin_rqst(rqst);
 | 
				
			||||||
		goto out_duplicate;
 | 
						spin_unlock(&xprt->recv_lock);
 | 
				
			||||||
 | 
						req = rpcr_to_rdmar(rqst);
 | 
				
			||||||
	list_replace_init(&req->rl_registered, &mws);
 | 
					 | 
				
			||||||
	rpcrdma_mark_remote_invalidation(&mws, rep);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	/* Avoid races with signals and duplicate replies
 | 
					 | 
				
			||||||
	 * by marking this req as matched.
 | 
					 | 
				
			||||||
	 */
 | 
					 | 
				
			||||||
	req->rl_reply = rep;
 | 
						req->rl_reply = rep;
 | 
				
			||||||
	spin_unlock(&buf->rb_lock);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
 | 
						dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
 | 
				
			||||||
		__func__, rep, req, be32_to_cpu(xid));
 | 
							__func__, rep, req, be32_to_cpu(xid));
 | 
				
			||||||
| 
						 | 
					@ -1285,17 +1273,12 @@ rpcrdma_reply_handler(struct work_struct *work)
 | 
				
			||||||
	 * waking the next RPC waits until this RPC has relinquished
 | 
						 * waking the next RPC waits until this RPC has relinquished
 | 
				
			||||||
	 * all its Send Queue entries.
 | 
						 * all its Send Queue entries.
 | 
				
			||||||
	 */
 | 
						 */
 | 
				
			||||||
	if (!list_empty(&mws))
 | 
						if (!list_empty(&req->rl_registered)) {
 | 
				
			||||||
		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
 | 
							rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
 | 
				
			||||||
 | 
							r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
 | 
				
			||||||
 | 
											    &req->rl_registered);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* Perform XID lookup, reconstruction of the RPC reply, and
 | 
					 | 
				
			||||||
	 * RPC completion while holding the transport lock to ensure
 | 
					 | 
				
			||||||
	 * the rep, rqst, and rq_task pointers remain stable.
 | 
					 | 
				
			||||||
	 */
 | 
					 | 
				
			||||||
	spin_lock(&xprt->recv_lock);
 | 
					 | 
				
			||||||
	rqst = xprt_lookup_rqst(xprt, xid);
 | 
					 | 
				
			||||||
	if (!rqst)
 | 
					 | 
				
			||||||
		goto out_norqst;
 | 
					 | 
				
			||||||
	xprt->reestablish_timeout = 0;
 | 
						xprt->reestablish_timeout = 0;
 | 
				
			||||||
	if (vers != rpcrdma_version)
 | 
						if (vers != rpcrdma_version)
 | 
				
			||||||
		goto out_badversion;
 | 
							goto out_badversion;
 | 
				
			||||||
| 
						 | 
					@ -1317,12 +1300,14 @@ rpcrdma_reply_handler(struct work_struct *work)
 | 
				
			||||||
		goto out_badheader;
 | 
							goto out_badheader;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
out:
 | 
					out:
 | 
				
			||||||
 | 
						spin_lock(&xprt->recv_lock);
 | 
				
			||||||
	cwnd = xprt->cwnd;
 | 
						cwnd = xprt->cwnd;
 | 
				
			||||||
	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
 | 
						xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
 | 
				
			||||||
	if (xprt->cwnd > cwnd)
 | 
						if (xprt->cwnd > cwnd)
 | 
				
			||||||
		xprt_release_rqst_cong(rqst->rq_task);
 | 
							xprt_release_rqst_cong(rqst->rq_task);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	xprt_complete_rqst(rqst->rq_task, status);
 | 
						xprt_complete_rqst(rqst->rq_task, status);
 | 
				
			||||||
 | 
						xprt_unpin_rqst(rqst);
 | 
				
			||||||
	spin_unlock(&xprt->recv_lock);
 | 
						spin_unlock(&xprt->recv_lock);
 | 
				
			||||||
	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
 | 
						dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
 | 
				
			||||||
		__func__, xprt, rqst, status);
 | 
							__func__, xprt, rqst, status);
 | 
				
			||||||
| 
						 | 
					@ -1360,26 +1345,13 @@ rpcrdma_reply_handler(struct work_struct *work)
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
out_norqst:
 | 
					out_norqst:
 | 
				
			||||||
	spin_unlock(&xprt->recv_lock);
 | 
						spin_unlock(&xprt->recv_lock);
 | 
				
			||||||
	rpcrdma_buffer_put(req);
 | 
					 | 
				
			||||||
	dprintk("RPC:       %s: race, no rqst left for req %p\n",
 | 
					 | 
				
			||||||
		__func__, req);
 | 
					 | 
				
			||||||
	return;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
out_shortreply:
 | 
					 | 
				
			||||||
	dprintk("RPC:       %s: short/invalid reply\n", __func__);
 | 
					 | 
				
			||||||
	goto repost;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
out_nomatch:
 | 
					 | 
				
			||||||
	spin_unlock(&buf->rb_lock);
 | 
					 | 
				
			||||||
	dprintk("RPC:       %s: no match for incoming xid 0x%08x\n",
 | 
						dprintk("RPC:       %s: no match for incoming xid 0x%08x\n",
 | 
				
			||||||
		__func__, be32_to_cpu(xid));
 | 
							__func__, be32_to_cpu(xid));
 | 
				
			||||||
	goto repost;
 | 
						goto repost;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
out_duplicate:
 | 
					out_shortreply:
 | 
				
			||||||
	spin_unlock(&buf->rb_lock);
 | 
						dprintk("RPC:       %s: short/invalid reply\n", __func__);
 | 
				
			||||||
	dprintk("RPC:       %s: "
 | 
						goto repost;
 | 
				
			||||||
		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
 | 
					 | 
				
			||||||
		__func__, rep, req, be32_to_cpu(xid));
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
/* If no pending RPC transaction was matched, post a replacement
 | 
					/* If no pending RPC transaction was matched, post a replacement
 | 
				
			||||||
 * receive buffer before returning.
 | 
					 * receive buffer before returning.
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -685,7 +685,6 @@ xprt_rdma_free(struct rpc_task *task)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 | 
						dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	rpcrdma_remove_req(&r_xprt->rx_buf, req);
 | 
					 | 
				
			||||||
	if (!list_empty(&req->rl_registered))
 | 
						if (!list_empty(&req->rl_registered))
 | 
				
			||||||
		ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
 | 
							ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
 | 
				
			||||||
	rpcrdma_unmap_sges(ia, req);
 | 
						rpcrdma_unmap_sges(ia, req);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1001,7 +1001,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 | 
				
			||||||
	spin_lock_init(&buf->rb_recovery_lock);
 | 
						spin_lock_init(&buf->rb_recovery_lock);
 | 
				
			||||||
	INIT_LIST_HEAD(&buf->rb_mws);
 | 
						INIT_LIST_HEAD(&buf->rb_mws);
 | 
				
			||||||
	INIT_LIST_HEAD(&buf->rb_all);
 | 
						INIT_LIST_HEAD(&buf->rb_all);
 | 
				
			||||||
	INIT_LIST_HEAD(&buf->rb_pending);
 | 
					 | 
				
			||||||
	INIT_LIST_HEAD(&buf->rb_stale_mrs);
 | 
						INIT_LIST_HEAD(&buf->rb_stale_mrs);
 | 
				
			||||||
	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
 | 
						INIT_DELAYED_WORK(&buf->rb_refresh_worker,
 | 
				
			||||||
			  rpcrdma_mr_refresh_worker);
 | 
								  rpcrdma_mr_refresh_worker);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -340,7 +340,6 @@ enum {
 | 
				
			||||||
struct rpcrdma_buffer;
 | 
					struct rpcrdma_buffer;
 | 
				
			||||||
struct rpcrdma_req {
 | 
					struct rpcrdma_req {
 | 
				
			||||||
	struct list_head	rl_list;
 | 
						struct list_head	rl_list;
 | 
				
			||||||
	__be32			rl_xid;
 | 
					 | 
				
			||||||
	unsigned int		rl_mapped_sges;
 | 
						unsigned int		rl_mapped_sges;
 | 
				
			||||||
	unsigned int		rl_connect_cookie;
 | 
						unsigned int		rl_connect_cookie;
 | 
				
			||||||
	struct rpcrdma_buffer	*rl_buffer;
 | 
						struct rpcrdma_buffer	*rl_buffer;
 | 
				
			||||||
| 
						 | 
					@ -404,7 +403,6 @@ struct rpcrdma_buffer {
 | 
				
			||||||
	int			rb_send_count, rb_recv_count;
 | 
						int			rb_send_count, rb_recv_count;
 | 
				
			||||||
	struct list_head	rb_send_bufs;
 | 
						struct list_head	rb_send_bufs;
 | 
				
			||||||
	struct list_head	rb_recv_bufs;
 | 
						struct list_head	rb_recv_bufs;
 | 
				
			||||||
	struct list_head	rb_pending;
 | 
					 | 
				
			||||||
	u32			rb_max_requests;
 | 
						u32			rb_max_requests;
 | 
				
			||||||
	atomic_t		rb_credits;	/* most recent credit grant */
 | 
						atomic_t		rb_credits;	/* most recent credit grant */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -557,34 +555,6 @@ void rpcrdma_destroy_req(struct rpcrdma_req *);
 | 
				
			||||||
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 | 
					int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 | 
				
			||||||
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 | 
					void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static inline void
 | 
					 | 
				
			||||||
rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
	spin_lock(&buffers->rb_lock);
 | 
					 | 
				
			||||||
	if (list_empty(&req->rl_list))
 | 
					 | 
				
			||||||
		list_add_tail(&req->rl_list, &buffers->rb_pending);
 | 
					 | 
				
			||||||
	spin_unlock(&buffers->rb_lock);
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
static inline struct rpcrdma_req *
 | 
					 | 
				
			||||||
rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
	struct rpcrdma_req *pos;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	list_for_each_entry(pos, &buffers->rb_pending, rl_list)
 | 
					 | 
				
			||||||
		if (pos->rl_xid == xid)
 | 
					 | 
				
			||||||
			return pos;
 | 
					 | 
				
			||||||
	return NULL;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
static inline void
 | 
					 | 
				
			||||||
rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
	spin_lock(&buffers->rb_lock);
 | 
					 | 
				
			||||||
	list_del(&req->rl_list);
 | 
					 | 
				
			||||||
	spin_unlock(&buffers->rb_lock);
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 | 
					struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 | 
				
			||||||
void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 | 
					void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 | 
				
			||||||
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
 | 
					struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in a new issue