mirror of
				https://github.com/torvalds/linux.git
				synced 2025-11-04 10:40:15 +02:00 
			
		
		
		
	xprtrdma: Replace rpcrdma_receive_wq with a per-xprt workqueue
To address a connection-close ordering problem, we need the ability to drain the RPC completions running on rpcrdma_receive_wq for just one transport. Give each transport its own RPC completion workqueue, and drain that workqueue when disconnecting the transport. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
This commit is contained in:
		
							parent
							
								
									6ceea36890
								
							
						
					
					
						commit
						6d2d0ee27c
					
				
					 4 changed files with 44 additions and 48 deletions
				
			
		| 
						 | 
				
			
			@ -1356,7 +1356,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 | 
			
		|||
	clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 | 
			
		||||
 | 
			
		||||
	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
 | 
			
		||||
	queue_work(rpcrdma_receive_wq, &rep->rr_work);
 | 
			
		||||
	queue_work(buf->rb_completion_wq, &rep->rr_work);
 | 
			
		||||
	return;
 | 
			
		||||
 | 
			
		||||
out_badversion:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -444,10 +444,14 @@ xprt_rdma_close(struct rpc_xprt *xprt)
 | 
			
		|||
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 | 
			
		||||
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 | 
			
		||||
 | 
			
		||||
	might_sleep();
 | 
			
		||||
 | 
			
		||||
	dprintk("RPC:       %s: closing xprt %p\n", __func__, xprt);
 | 
			
		||||
 | 
			
		||||
	/* Prevent marshaling and sending of new requests */
 | 
			
		||||
	xprt_clear_connected(xprt);
 | 
			
		||||
 | 
			
		||||
	if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
 | 
			
		||||
		xprt_clear_connected(xprt);
 | 
			
		||||
		rpcrdma_ia_remove(ia);
 | 
			
		||||
		return;
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -858,8 +862,6 @@ void xprt_rdma_cleanup(void)
 | 
			
		|||
		dprintk("RPC:       %s: xprt_unregister returned %i\n",
 | 
			
		||||
			__func__, rc);
 | 
			
		||||
 | 
			
		||||
	rpcrdma_destroy_wq();
 | 
			
		||||
 | 
			
		||||
	rc = xprt_unregister_transport(&xprt_rdma_bc);
 | 
			
		||||
	if (rc)
 | 
			
		||||
		dprintk("RPC:       %s: xprt_unregister(bc) returned %i\n",
 | 
			
		||||
| 
						 | 
				
			
			@ -870,20 +872,13 @@ int xprt_rdma_init(void)
 | 
			
		|||
{
 | 
			
		||||
	int rc;
 | 
			
		||||
 | 
			
		||||
	rc = rpcrdma_alloc_wq();
 | 
			
		||||
	rc = xprt_register_transport(&xprt_rdma);
 | 
			
		||||
	if (rc)
 | 
			
		||||
		return rc;
 | 
			
		||||
 | 
			
		||||
	rc = xprt_register_transport(&xprt_rdma);
 | 
			
		||||
	if (rc) {
 | 
			
		||||
		rpcrdma_destroy_wq();
 | 
			
		||||
		return rc;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rc = xprt_register_transport(&xprt_rdma_bc);
 | 
			
		||||
	if (rc) {
 | 
			
		||||
		xprt_unregister_transport(&xprt_rdma);
 | 
			
		||||
		rpcrdma_destroy_wq();
 | 
			
		||||
		return rc;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -80,33 +80,23 @@ static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
 | 
			
		|||
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
 | 
			
		||||
static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 | 
			
		||||
 | 
			
		||||
struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
 | 
			
		||||
 | 
			
		||||
int
 | 
			
		||||
rpcrdma_alloc_wq(void)
 | 
			
		||||
/* Wait for outstanding transport work to finish.
 | 
			
		||||
 */
 | 
			
		||||
static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 | 
			
		||||
{
 | 
			
		||||
	struct workqueue_struct *recv_wq;
 | 
			
		||||
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 | 
			
		||||
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 | 
			
		||||
 | 
			
		||||
	recv_wq = alloc_workqueue("xprtrdma_receive",
 | 
			
		||||
				  WQ_MEM_RECLAIM | WQ_HIGHPRI,
 | 
			
		||||
				  0);
 | 
			
		||||
	if (!recv_wq)
 | 
			
		||||
		return -ENOMEM;
 | 
			
		||||
	/* Flush Receives, then wait for deferred Reply work
 | 
			
		||||
	 * to complete.
 | 
			
		||||
	 */
 | 
			
		||||
	ib_drain_qp(ia->ri_id->qp);
 | 
			
		||||
	drain_workqueue(buf->rb_completion_wq);
 | 
			
		||||
 | 
			
		||||
	rpcrdma_receive_wq = recv_wq;
 | 
			
		||||
	return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
rpcrdma_destroy_wq(void)
 | 
			
		||||
{
 | 
			
		||||
	struct workqueue_struct *wq;
 | 
			
		||||
 | 
			
		||||
	if (rpcrdma_receive_wq) {
 | 
			
		||||
		wq = rpcrdma_receive_wq;
 | 
			
		||||
		rpcrdma_receive_wq = NULL;
 | 
			
		||||
		destroy_workqueue(wq);
 | 
			
		||||
	}
 | 
			
		||||
	/* Deferred Reply processing might have scheduled
 | 
			
		||||
	 * local invalidations.
 | 
			
		||||
	 */
 | 
			
		||||
	ib_drain_sq(ia->ri_id->qp);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
| 
						 | 
				
			
			@ -483,7 +473,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
 | 
			
		|||
	 *   connection is already gone.
 | 
			
		||||
	 */
 | 
			
		||||
	if (ia->ri_id->qp) {
 | 
			
		||||
		ib_drain_qp(ia->ri_id->qp);
 | 
			
		||||
		rpcrdma_xprt_drain(r_xprt);
 | 
			
		||||
		rdma_destroy_qp(ia->ri_id);
 | 
			
		||||
		ia->ri_id->qp = NULL;
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -825,8 +815,10 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 | 
			
		|||
	return rc;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * rpcrdma_ep_disconnect
 | 
			
		||||
/**
 | 
			
		||||
 * rpcrdma_ep_disconnect - Disconnect underlying transport
 | 
			
		||||
 * @ep: endpoint to disconnect
 | 
			
		||||
 * @ia: associated interface adapter
 | 
			
		||||
 *
 | 
			
		||||
 * This is separate from destroy to facilitate the ability
 | 
			
		||||
 * to reconnect without recreating the endpoint.
 | 
			
		||||
| 
						 | 
				
			
			@ -837,19 +829,20 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 | 
			
		|||
void
 | 
			
		||||
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 | 
			
		||||
{
 | 
			
		||||
	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
 | 
			
		||||
						   rx_ep);
 | 
			
		||||
	int rc;
 | 
			
		||||
 | 
			
		||||
	/* returns without wait if ID is not connected */
 | 
			
		||||
	rc = rdma_disconnect(ia->ri_id);
 | 
			
		||||
	if (!rc)
 | 
			
		||||
		/* returns without wait if not connected */
 | 
			
		||||
		wait_event_interruptible(ep->rep_connect_wait,
 | 
			
		||||
							ep->rep_connected != 1);
 | 
			
		||||
	else
 | 
			
		||||
		ep->rep_connected = rc;
 | 
			
		||||
	trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
 | 
			
		||||
					       rx_ep), rc);
 | 
			
		||||
	trace_xprtrdma_disconnect(r_xprt, rc);
 | 
			
		||||
 | 
			
		||||
	ib_drain_qp(ia->ri_id->qp);
 | 
			
		||||
	rpcrdma_xprt_drain(r_xprt);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* Fixed-size circular FIFO queue. This implementation is wait-free and
 | 
			
		||||
| 
						 | 
				
			
			@ -1183,6 +1176,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 | 
			
		|||
	if (rc)
 | 
			
		||||
		goto out;
 | 
			
		||||
 | 
			
		||||
	buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
 | 
			
		||||
						WQ_MEM_RECLAIM | WQ_HIGHPRI,
 | 
			
		||||
						0,
 | 
			
		||||
			r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
 | 
			
		||||
	if (!buf->rb_completion_wq)
 | 
			
		||||
		goto out;
 | 
			
		||||
 | 
			
		||||
	return 0;
 | 
			
		||||
out:
 | 
			
		||||
	rpcrdma_buffer_destroy(buf);
 | 
			
		||||
| 
						 | 
				
			
			@ -1241,6 +1241,11 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 | 
			
		|||
{
 | 
			
		||||
	cancel_delayed_work_sync(&buf->rb_refresh_worker);
 | 
			
		||||
 | 
			
		||||
	if (buf->rb_completion_wq) {
 | 
			
		||||
		destroy_workqueue(buf->rb_completion_wq);
 | 
			
		||||
		buf->rb_completion_wq = NULL;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rpcrdma_sendctxs_destroy(buf);
 | 
			
		||||
 | 
			
		||||
	while (!list_empty(&buf->rb_recv_bufs)) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -412,6 +412,7 @@ struct rpcrdma_buffer {
 | 
			
		|||
 | 
			
		||||
	u32			rb_bc_max_requests;
 | 
			
		||||
 | 
			
		||||
	struct workqueue_struct *rb_completion_wq;
 | 
			
		||||
	struct delayed_work	rb_refresh_worker;
 | 
			
		||||
};
 | 
			
		||||
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
 | 
			
		||||
| 
						 | 
				
			
			@ -547,8 +548,6 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
 | 
			
		|||
bool frwr_is_supported(struct rpcrdma_ia *);
 | 
			
		||||
bool fmr_is_supported(struct rpcrdma_ia *);
 | 
			
		||||
 | 
			
		||||
extern struct workqueue_struct *rpcrdma_receive_wq;
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Endpoint calls - xprtrdma/verbs.c
 | 
			
		||||
 */
 | 
			
		||||
| 
						 | 
				
			
			@ -603,9 +602,6 @@ rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
 | 
			
		|||
	return __rpcrdma_dma_map_regbuf(ia, rb);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int rpcrdma_alloc_wq(void);
 | 
			
		||||
void rpcrdma_destroy_wq(void);
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Wrappers for chunk registration, shared by read/write chunk code.
 | 
			
		||||
 */
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in a new issue