mirror of
				https://github.com/torvalds/linux.git
				synced 2025-11-04 10:40:15 +02:00 
			
		
		
		
	SUNRPC: Simplify TCP receive code by switching to using iterators
Most of this code should also be reusable with other socket types. Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
This commit is contained in:
		
							parent
							
								
									9d96acbc7f
								
							
						
					
					
						commit
						277e4ab7d5
					
				
					 3 changed files with 338 additions and 393 deletions
				
			
		| 
						 | 
				
			
			@ -31,15 +31,16 @@ struct sock_xprt {
 | 
			
		|||
	 * State of TCP reply receive
 | 
			
		||||
	 */
 | 
			
		||||
	struct {
 | 
			
		||||
		__be32		fraghdr,
 | 
			
		||||
		struct {
 | 
			
		||||
			__be32	fraghdr,
 | 
			
		||||
				xid,
 | 
			
		||||
				calldir;
 | 
			
		||||
		} __attribute__((packed));
 | 
			
		||||
 | 
			
		||||
		u32		offset,
 | 
			
		||||
				len;
 | 
			
		||||
 | 
			
		||||
		unsigned long	copied,
 | 
			
		||||
				flags;
 | 
			
		||||
		unsigned long	copied;
 | 
			
		||||
	} recv;
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
| 
						 | 
				
			
			@ -76,21 +77,9 @@ struct sock_xprt {
 | 
			
		|||
	void			(*old_error_report)(struct sock *);
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * TCP receive state flags
 | 
			
		||||
 */
 | 
			
		||||
#define TCP_RCV_LAST_FRAG	(1UL << 0)
 | 
			
		||||
#define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
 | 
			
		||||
#define TCP_RCV_COPY_XID	(1UL << 2)
 | 
			
		||||
#define TCP_RCV_COPY_DATA	(1UL << 3)
 | 
			
		||||
#define TCP_RCV_READ_CALLDIR	(1UL << 4)
 | 
			
		||||
#define TCP_RCV_COPY_CALLDIR	(1UL << 5)
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * TCP RPC flags
 | 
			
		||||
 */
 | 
			
		||||
#define TCP_RPC_REPLY		(1UL << 6)
 | 
			
		||||
 | 
			
		||||
#define XPRT_SOCK_CONNECTING	1U
 | 
			
		||||
#define XPRT_SOCK_DATA_READY	(2)
 | 
			
		||||
#define XPRT_SOCK_UPD_TIMEOUT	(3)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -497,16 +497,6 @@ TRACE_EVENT(xs_tcp_data_ready,
 | 
			
		|||
			__get_str(port), __entry->err, __entry->total)
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
#define rpc_show_sock_xprt_flags(flags) \
 | 
			
		||||
	__print_flags(flags, "|", \
 | 
			
		||||
		{ TCP_RCV_LAST_FRAG, "TCP_RCV_LAST_FRAG" }, \
 | 
			
		||||
		{ TCP_RCV_COPY_FRAGHDR, "TCP_RCV_COPY_FRAGHDR" }, \
 | 
			
		||||
		{ TCP_RCV_COPY_XID, "TCP_RCV_COPY_XID" }, \
 | 
			
		||||
		{ TCP_RCV_COPY_DATA, "TCP_RCV_COPY_DATA" }, \
 | 
			
		||||
		{ TCP_RCV_READ_CALLDIR, "TCP_RCV_READ_CALLDIR" }, \
 | 
			
		||||
		{ TCP_RCV_COPY_CALLDIR, "TCP_RCV_COPY_CALLDIR" }, \
 | 
			
		||||
		{ TCP_RPC_REPLY, "TCP_RPC_REPLY" })
 | 
			
		||||
 | 
			
		||||
TRACE_EVENT(xs_tcp_data_recv,
 | 
			
		||||
	TP_PROTO(struct sock_xprt *xs),
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -516,7 +506,6 @@ TRACE_EVENT(xs_tcp_data_recv,
 | 
			
		|||
		__string(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR])
 | 
			
		||||
		__string(port, xs->xprt.address_strings[RPC_DISPLAY_PORT])
 | 
			
		||||
		__field(u32, xid)
 | 
			
		||||
		__field(unsigned long, flags)
 | 
			
		||||
		__field(unsigned long, copied)
 | 
			
		||||
		__field(unsigned int, reclen)
 | 
			
		||||
		__field(unsigned long, offset)
 | 
			
		||||
| 
						 | 
				
			
			@ -526,15 +515,13 @@ TRACE_EVENT(xs_tcp_data_recv,
 | 
			
		|||
		__assign_str(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]);
 | 
			
		||||
		__assign_str(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]);
 | 
			
		||||
		__entry->xid = be32_to_cpu(xs->recv.xid);
 | 
			
		||||
		__entry->flags = xs->recv.flags;
 | 
			
		||||
		__entry->copied = xs->recv.copied;
 | 
			
		||||
		__entry->reclen = xs->recv.len;
 | 
			
		||||
		__entry->offset = xs->recv.offset;
 | 
			
		||||
	),
 | 
			
		||||
 | 
			
		||||
	TP_printk("peer=[%s]:%s xid=0x%08x flags=%s copied=%lu reclen=%u offset=%lu",
 | 
			
		||||
	TP_printk("peer=[%s]:%s xid=0x%08x copied=%lu reclen=%u offset=%lu",
 | 
			
		||||
			__get_str(addr), __get_str(port), __entry->xid,
 | 
			
		||||
			rpc_show_sock_xprt_flags(__entry->flags),
 | 
			
		||||
			__entry->copied, __entry->reclen, __entry->offset)
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -47,13 +47,13 @@
 | 
			
		|||
#include <net/checksum.h>
 | 
			
		||||
#include <net/udp.h>
 | 
			
		||||
#include <net/tcp.h>
 | 
			
		||||
#include <linux/bvec.h>
 | 
			
		||||
#include <linux/uio.h>
 | 
			
		||||
 | 
			
		||||
#include <trace/events/sunrpc.h>
 | 
			
		||||
 | 
			
		||||
#include "sunrpc.h"
 | 
			
		||||
 | 
			
		||||
#define RPC_TCP_READ_CHUNK_SZ	(3*512*1024)
 | 
			
		||||
 | 
			
		||||
static void xs_close(struct rpc_xprt *xprt);
 | 
			
		||||
static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
 | 
			
		||||
		struct socket *sock);
 | 
			
		||||
| 
						 | 
				
			
			@ -325,6 +325,323 @@ static void xs_free_peer_addresses(struct rpc_xprt *xprt)
 | 
			
		|||
		}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static size_t
 | 
			
		||||
xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp)
 | 
			
		||||
{
 | 
			
		||||
	size_t i,n;
 | 
			
		||||
 | 
			
		||||
	if (!(buf->flags & XDRBUF_SPARSE_PAGES))
 | 
			
		||||
		return want;
 | 
			
		||||
	if (want > buf->page_len)
 | 
			
		||||
		want = buf->page_len;
 | 
			
		||||
	n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT;
 | 
			
		||||
	for (i = 0; i < n; i++) {
 | 
			
		||||
		if (buf->pages[i])
 | 
			
		||||
			continue;
 | 
			
		||||
		buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp);
 | 
			
		||||
		if (!buf->pages[i]) {
 | 
			
		||||
			buf->page_len = (i * PAGE_SIZE) - buf->page_base;
 | 
			
		||||
			return buf->page_len;
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return want;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek)
 | 
			
		||||
{
 | 
			
		||||
	ssize_t ret;
 | 
			
		||||
	if (seek != 0)
 | 
			
		||||
		iov_iter_advance(&msg->msg_iter, seek);
 | 
			
		||||
	ret = sock_recvmsg(sock, msg, flags);
 | 
			
		||||
	return ret > 0 ? ret + seek : ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags,
 | 
			
		||||
		struct kvec *kvec, size_t count, size_t seek)
 | 
			
		||||
{
 | 
			
		||||
	iov_iter_kvec(&msg->msg_iter, READ | ITER_KVEC, kvec, 1, count);
 | 
			
		||||
	return xs_sock_recvmsg(sock, msg, flags, seek);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags,
 | 
			
		||||
		struct bio_vec *bvec, unsigned long nr, size_t count,
 | 
			
		||||
		size_t seek)
 | 
			
		||||
{
 | 
			
		||||
	iov_iter_bvec(&msg->msg_iter, READ | ITER_BVEC, bvec, nr, count);
 | 
			
		||||
	return xs_sock_recvmsg(sock, msg, flags, seek);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_read_discard(struct socket *sock, struct msghdr *msg, int flags,
 | 
			
		||||
		size_t count)
 | 
			
		||||
{
 | 
			
		||||
	struct kvec kvec = { 0 };
 | 
			
		||||
	return xs_read_kvec(sock, msg, flags | MSG_TRUNC, &kvec, count, 0);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
 | 
			
		||||
		struct xdr_buf *buf, size_t count, size_t seek, size_t *read)
 | 
			
		||||
{
 | 
			
		||||
	size_t want, seek_init = seek, offset = 0;
 | 
			
		||||
	ssize_t ret;
 | 
			
		||||
 | 
			
		||||
	if (seek < buf->head[0].iov_len) {
 | 
			
		||||
		want = min_t(size_t, count, buf->head[0].iov_len);
 | 
			
		||||
		ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
 | 
			
		||||
		if (ret <= 0)
 | 
			
		||||
			goto sock_err;
 | 
			
		||||
		offset += ret;
 | 
			
		||||
		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
 | 
			
		||||
			goto out;
 | 
			
		||||
		if (ret != want)
 | 
			
		||||
			goto eagain;
 | 
			
		||||
		seek = 0;
 | 
			
		||||
	} else {
 | 
			
		||||
		seek -= buf->head[0].iov_len;
 | 
			
		||||
		offset += buf->head[0].iov_len;
 | 
			
		||||
	}
 | 
			
		||||
	if (seek < buf->page_len) {
 | 
			
		||||
		want = xs_alloc_sparse_pages(buf,
 | 
			
		||||
				min_t(size_t, count - offset, buf->page_len),
 | 
			
		||||
				GFP_NOWAIT);
 | 
			
		||||
		ret = xs_read_bvec(sock, msg, flags, buf->bvec,
 | 
			
		||||
				xdr_buf_pagecount(buf),
 | 
			
		||||
				want + buf->page_base,
 | 
			
		||||
				seek + buf->page_base);
 | 
			
		||||
		if (ret <= 0)
 | 
			
		||||
			goto sock_err;
 | 
			
		||||
		offset += ret - buf->page_base;
 | 
			
		||||
		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
 | 
			
		||||
			goto out;
 | 
			
		||||
		if (ret != want)
 | 
			
		||||
			goto eagain;
 | 
			
		||||
		seek = 0;
 | 
			
		||||
	} else {
 | 
			
		||||
		seek -= buf->page_len;
 | 
			
		||||
		offset += buf->page_len;
 | 
			
		||||
	}
 | 
			
		||||
	if (seek < buf->tail[0].iov_len) {
 | 
			
		||||
		want = min_t(size_t, count - offset, buf->tail[0].iov_len);
 | 
			
		||||
		ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
 | 
			
		||||
		if (ret <= 0)
 | 
			
		||||
			goto sock_err;
 | 
			
		||||
		offset += ret;
 | 
			
		||||
		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
 | 
			
		||||
			goto out;
 | 
			
		||||
		if (ret != want)
 | 
			
		||||
			goto eagain;
 | 
			
		||||
	} else
 | 
			
		||||
		offset += buf->tail[0].iov_len;
 | 
			
		||||
	ret = -EMSGSIZE;
 | 
			
		||||
	msg->msg_flags |= MSG_TRUNC;
 | 
			
		||||
out:
 | 
			
		||||
	*read = offset - seek_init;
 | 
			
		||||
	return ret;
 | 
			
		||||
eagain:
 | 
			
		||||
	ret = -EAGAIN;
 | 
			
		||||
	goto out;
 | 
			
		||||
sock_err:
 | 
			
		||||
	offset += seek;
 | 
			
		||||
	goto out;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void
 | 
			
		||||
xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf)
 | 
			
		||||
{
 | 
			
		||||
	if (!transport->recv.copied) {
 | 
			
		||||
		if (buf->head[0].iov_len >= transport->recv.offset)
 | 
			
		||||
			memcpy(buf->head[0].iov_base,
 | 
			
		||||
					&transport->recv.xid,
 | 
			
		||||
					transport->recv.offset);
 | 
			
		||||
		transport->recv.copied = transport->recv.offset;
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static bool
 | 
			
		||||
xs_read_stream_request_done(struct sock_xprt *transport)
 | 
			
		||||
{
 | 
			
		||||
	return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
 | 
			
		||||
		int flags, struct rpc_rqst *req)
 | 
			
		||||
{
 | 
			
		||||
	struct xdr_buf *buf = &req->rq_private_buf;
 | 
			
		||||
	size_t want, read;
 | 
			
		||||
	ssize_t ret;
 | 
			
		||||
 | 
			
		||||
	xs_read_header(transport, buf);
 | 
			
		||||
 | 
			
		||||
	want = transport->recv.len - transport->recv.offset;
 | 
			
		||||
	ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
 | 
			
		||||
			transport->recv.copied + want, transport->recv.copied,
 | 
			
		||||
			&read);
 | 
			
		||||
	transport->recv.offset += read;
 | 
			
		||||
	transport->recv.copied += read;
 | 
			
		||||
	if (transport->recv.offset == transport->recv.len) {
 | 
			
		||||
		if (xs_read_stream_request_done(transport))
 | 
			
		||||
			msg->msg_flags |= MSG_EOR;
 | 
			
		||||
		return transport->recv.copied;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	switch (ret) {
 | 
			
		||||
	case -EMSGSIZE:
 | 
			
		||||
		return transport->recv.copied;
 | 
			
		||||
	case 0:
 | 
			
		||||
		return -ESHUTDOWN;
 | 
			
		||||
	default:
 | 
			
		||||
		if (ret < 0)
 | 
			
		||||
			return ret;
 | 
			
		||||
	}
 | 
			
		||||
	return -EAGAIN;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static size_t
 | 
			
		||||
xs_read_stream_headersize(bool isfrag)
 | 
			
		||||
{
 | 
			
		||||
	if (isfrag)
 | 
			
		||||
		return sizeof(__be32);
 | 
			
		||||
	return 3 * sizeof(__be32);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg,
 | 
			
		||||
		int flags, size_t want, size_t seek)
 | 
			
		||||
{
 | 
			
		||||
	struct kvec kvec = {
 | 
			
		||||
		.iov_base = &transport->recv.fraghdr,
 | 
			
		||||
		.iov_len = want,
 | 
			
		||||
	};
 | 
			
		||||
	return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
 | 
			
		||||
{
 | 
			
		||||
	struct rpc_xprt *xprt = &transport->xprt;
 | 
			
		||||
	struct rpc_rqst *req;
 | 
			
		||||
	ssize_t ret;
 | 
			
		||||
 | 
			
		||||
	/* Look up and lock the request corresponding to the given XID */
 | 
			
		||||
	req = xprt_lookup_bc_request(xprt, transport->recv.xid);
 | 
			
		||||
	if (!req) {
 | 
			
		||||
		printk(KERN_WARNING "Callback slot table overflowed\n");
 | 
			
		||||
		return -ESHUTDOWN;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ret = xs_read_stream_request(transport, msg, flags, req);
 | 
			
		||||
	if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
 | 
			
		||||
		xprt_complete_bc_request(req, ret);
 | 
			
		||||
 | 
			
		||||
	return ret;
 | 
			
		||||
}
 | 
			
		||||
#else /* CONFIG_SUNRPC_BACKCHANNEL */
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
 | 
			
		||||
{
 | 
			
		||||
	return -ESHUTDOWN;
 | 
			
		||||
}
 | 
			
		||||
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 | 
			
		||||
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
 | 
			
		||||
{
 | 
			
		||||
	struct rpc_xprt *xprt = &transport->xprt;
 | 
			
		||||
	struct rpc_rqst *req;
 | 
			
		||||
	ssize_t ret = 0;
 | 
			
		||||
 | 
			
		||||
	/* Look up and lock the request corresponding to the given XID */
 | 
			
		||||
	spin_lock(&xprt->queue_lock);
 | 
			
		||||
	req = xprt_lookup_rqst(xprt, transport->recv.xid);
 | 
			
		||||
	if (!req) {
 | 
			
		||||
		msg->msg_flags |= MSG_TRUNC;
 | 
			
		||||
		goto out;
 | 
			
		||||
	}
 | 
			
		||||
	xprt_pin_rqst(req);
 | 
			
		||||
	spin_unlock(&xprt->queue_lock);
 | 
			
		||||
 | 
			
		||||
	ret = xs_read_stream_request(transport, msg, flags, req);
 | 
			
		||||
 | 
			
		||||
	spin_lock(&xprt->queue_lock);
 | 
			
		||||
	if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
 | 
			
		||||
		xprt_complete_rqst(req->rq_task, ret);
 | 
			
		||||
	xprt_unpin_rqst(req);
 | 
			
		||||
out:
 | 
			
		||||
	spin_unlock(&xprt->queue_lock);
 | 
			
		||||
	return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static ssize_t
 | 
			
		||||
xs_read_stream(struct sock_xprt *transport, int flags)
 | 
			
		||||
{
 | 
			
		||||
	struct msghdr msg = { 0 };
 | 
			
		||||
	size_t want, read = 0;
 | 
			
		||||
	ssize_t ret = 0;
 | 
			
		||||
 | 
			
		||||
	if (transport->recv.len == 0) {
 | 
			
		||||
		want = xs_read_stream_headersize(transport->recv.copied != 0);
 | 
			
		||||
		ret = xs_read_stream_header(transport, &msg, flags, want,
 | 
			
		||||
				transport->recv.offset);
 | 
			
		||||
		if (ret <= 0)
 | 
			
		||||
			goto out_err;
 | 
			
		||||
		transport->recv.offset = ret;
 | 
			
		||||
		if (ret != want) {
 | 
			
		||||
			ret = -EAGAIN;
 | 
			
		||||
			goto out_err;
 | 
			
		||||
		}
 | 
			
		||||
		transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
 | 
			
		||||
			RPC_FRAGMENT_SIZE_MASK;
 | 
			
		||||
		transport->recv.offset -= sizeof(transport->recv.fraghdr);
 | 
			
		||||
		read = ret;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	switch (be32_to_cpu(transport->recv.calldir)) {
 | 
			
		||||
	case RPC_CALL:
 | 
			
		||||
		ret = xs_read_stream_call(transport, &msg, flags);
 | 
			
		||||
		break;
 | 
			
		||||
	case RPC_REPLY:
 | 
			
		||||
		ret = xs_read_stream_reply(transport, &msg, flags);
 | 
			
		||||
	}
 | 
			
		||||
	if (msg.msg_flags & MSG_TRUNC) {
 | 
			
		||||
		transport->recv.calldir = cpu_to_be32(-1);
 | 
			
		||||
		transport->recv.copied = -1;
 | 
			
		||||
	}
 | 
			
		||||
	if (ret < 0)
 | 
			
		||||
		goto out_err;
 | 
			
		||||
	read += ret;
 | 
			
		||||
	if (transport->recv.offset < transport->recv.len) {
 | 
			
		||||
		ret = xs_read_discard(transport->sock, &msg, flags,
 | 
			
		||||
				transport->recv.len - transport->recv.offset);
 | 
			
		||||
		if (ret <= 0)
 | 
			
		||||
			goto out_err;
 | 
			
		||||
		transport->recv.offset += ret;
 | 
			
		||||
		read += ret;
 | 
			
		||||
		if (transport->recv.offset != transport->recv.len)
 | 
			
		||||
			return -EAGAIN;
 | 
			
		||||
	}
 | 
			
		||||
	if (xs_read_stream_request_done(transport)) {
 | 
			
		||||
		trace_xs_tcp_data_recv(transport);
 | 
			
		||||
		transport->recv.copied = 0;
 | 
			
		||||
	}
 | 
			
		||||
	transport->recv.offset = 0;
 | 
			
		||||
	transport->recv.len = 0;
 | 
			
		||||
	return read;
 | 
			
		||||
out_err:
 | 
			
		||||
	switch (ret) {
 | 
			
		||||
	case 0:
 | 
			
		||||
	case -ESHUTDOWN:
 | 
			
		||||
		xprt_force_disconnect(&transport->xprt);
 | 
			
		||||
		return -ESHUTDOWN;
 | 
			
		||||
	}
 | 
			
		||||
	return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
 | 
			
		||||
 | 
			
		||||
static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
 | 
			
		||||
| 
						 | 
				
			
			@ -484,6 +801,12 @@ static int xs_nospace(struct rpc_rqst *req)
 | 
			
		|||
	return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void
 | 
			
		||||
xs_stream_prepare_request(struct rpc_rqst *req)
 | 
			
		||||
{
 | 
			
		||||
	req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_NOIO);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Determine if the previous message in the stream was aborted before it
 | 
			
		||||
 * could complete transmission.
 | 
			
		||||
| 
						 | 
				
			
			@ -1157,263 +1480,7 @@ static void xs_tcp_force_close(struct rpc_xprt *xprt)
 | 
			
		|||
	xprt_force_disconnect(xprt);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
 | 
			
		||||
{
 | 
			
		||||
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 | 
			
		||||
	size_t len, used;
 | 
			
		||||
	char *p;
 | 
			
		||||
 | 
			
		||||
	p = ((char *) &transport->recv.fraghdr) + transport->recv.offset;
 | 
			
		||||
	len = sizeof(transport->recv.fraghdr) - transport->recv.offset;
 | 
			
		||||
	used = xdr_skb_read_bits(desc, p, len);
 | 
			
		||||
	transport->recv.offset += used;
 | 
			
		||||
	if (used != len)
 | 
			
		||||
		return;
 | 
			
		||||
 | 
			
		||||
	transport->recv.len = ntohl(transport->recv.fraghdr);
 | 
			
		||||
	if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT)
 | 
			
		||||
		transport->recv.flags |= TCP_RCV_LAST_FRAG;
 | 
			
		||||
	else
 | 
			
		||||
		transport->recv.flags &= ~TCP_RCV_LAST_FRAG;
 | 
			
		||||
	transport->recv.len &= RPC_FRAGMENT_SIZE_MASK;
 | 
			
		||||
 | 
			
		||||
	transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR;
 | 
			
		||||
	transport->recv.offset = 0;
 | 
			
		||||
 | 
			
		||||
	/* Sanity check of the record length */
 | 
			
		||||
	if (unlikely(transport->recv.len < 8)) {
 | 
			
		||||
		dprintk("RPC:       invalid TCP record fragment length\n");
 | 
			
		||||
		xs_tcp_force_close(xprt);
 | 
			
		||||
		return;
 | 
			
		||||
	}
 | 
			
		||||
	dprintk("RPC:       reading TCP record fragment of length %d\n",
 | 
			
		||||
			transport->recv.len);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
 | 
			
		||||
{
 | 
			
		||||
	if (transport->recv.offset == transport->recv.len) {
 | 
			
		||||
		transport->recv.flags |= TCP_RCV_COPY_FRAGHDR;
 | 
			
		||||
		transport->recv.offset = 0;
 | 
			
		||||
		if (transport->recv.flags & TCP_RCV_LAST_FRAG) {
 | 
			
		||||
			transport->recv.flags &= ~TCP_RCV_COPY_DATA;
 | 
			
		||||
			transport->recv.flags |= TCP_RCV_COPY_XID;
 | 
			
		||||
			transport->recv.copied = 0;
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
 | 
			
		||||
{
 | 
			
		||||
	size_t len, used;
 | 
			
		||||
	char *p;
 | 
			
		||||
 | 
			
		||||
	len = sizeof(transport->recv.xid) - transport->recv.offset;
 | 
			
		||||
	dprintk("RPC:       reading XID (%zu bytes)\n", len);
 | 
			
		||||
	p = ((char *) &transport->recv.xid) + transport->recv.offset;
 | 
			
		||||
	used = xdr_skb_read_bits(desc, p, len);
 | 
			
		||||
	transport->recv.offset += used;
 | 
			
		||||
	if (used != len)
 | 
			
		||||
		return;
 | 
			
		||||
	transport->recv.flags &= ~TCP_RCV_COPY_XID;
 | 
			
		||||
	transport->recv.flags |= TCP_RCV_READ_CALLDIR;
 | 
			
		||||
	transport->recv.copied = 4;
 | 
			
		||||
	dprintk("RPC:       reading %s XID %08x\n",
 | 
			
		||||
			(transport->recv.flags & TCP_RPC_REPLY) ? "reply for"
 | 
			
		||||
							      : "request with",
 | 
			
		||||
			ntohl(transport->recv.xid));
 | 
			
		||||
	xs_tcp_check_fraghdr(transport);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
 | 
			
		||||
				       struct xdr_skb_reader *desc)
 | 
			
		||||
{
 | 
			
		||||
	size_t len, used;
 | 
			
		||||
	u32 offset;
 | 
			
		||||
	char *p;
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
	 * We want transport->recv.offset to be 8 at the end of this routine
 | 
			
		||||
	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
 | 
			
		||||
	 * When this function is called for the first time,
 | 
			
		||||
	 * transport->recv.offset is 4 (after having already read the xid).
 | 
			
		||||
	 */
 | 
			
		||||
	offset = transport->recv.offset - sizeof(transport->recv.xid);
 | 
			
		||||
	len = sizeof(transport->recv.calldir) - offset;
 | 
			
		||||
	dprintk("RPC:       reading CALL/REPLY flag (%zu bytes)\n", len);
 | 
			
		||||
	p = ((char *) &transport->recv.calldir) + offset;
 | 
			
		||||
	used = xdr_skb_read_bits(desc, p, len);
 | 
			
		||||
	transport->recv.offset += used;
 | 
			
		||||
	if (used != len)
 | 
			
		||||
		return;
 | 
			
		||||
	transport->recv.flags &= ~TCP_RCV_READ_CALLDIR;
 | 
			
		||||
	/*
 | 
			
		||||
	 * We don't yet have the XDR buffer, so we will write the calldir
 | 
			
		||||
	 * out after we get the buffer from the 'struct rpc_rqst'
 | 
			
		||||
	 */
 | 
			
		||||
	switch (ntohl(transport->recv.calldir)) {
 | 
			
		||||
	case RPC_REPLY:
 | 
			
		||||
		transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
 | 
			
		||||
		transport->recv.flags |= TCP_RCV_COPY_DATA;
 | 
			
		||||
		transport->recv.flags |= TCP_RPC_REPLY;
 | 
			
		||||
		break;
 | 
			
		||||
	case RPC_CALL:
 | 
			
		||||
		transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
 | 
			
		||||
		transport->recv.flags |= TCP_RCV_COPY_DATA;
 | 
			
		||||
		transport->recv.flags &= ~TCP_RPC_REPLY;
 | 
			
		||||
		break;
 | 
			
		||||
	default:
 | 
			
		||||
		dprintk("RPC:       invalid request message type\n");
 | 
			
		||||
		xs_tcp_force_close(&transport->xprt);
 | 
			
		||||
	}
 | 
			
		||||
	xs_tcp_check_fraghdr(transport);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
 | 
			
		||||
				     struct xdr_skb_reader *desc,
 | 
			
		||||
				     struct rpc_rqst *req)
 | 
			
		||||
{
 | 
			
		||||
	struct sock_xprt *transport =
 | 
			
		||||
				container_of(xprt, struct sock_xprt, xprt);
 | 
			
		||||
	struct xdr_buf *rcvbuf;
 | 
			
		||||
	size_t len;
 | 
			
		||||
	ssize_t r;
 | 
			
		||||
 | 
			
		||||
	rcvbuf = &req->rq_private_buf;
 | 
			
		||||
 | 
			
		||||
	if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) {
 | 
			
		||||
		/*
 | 
			
		||||
		 * Save the RPC direction in the XDR buffer
 | 
			
		||||
		 */
 | 
			
		||||
		memcpy(rcvbuf->head[0].iov_base + transport->recv.copied,
 | 
			
		||||
			&transport->recv.calldir,
 | 
			
		||||
			sizeof(transport->recv.calldir));
 | 
			
		||||
		transport->recv.copied += sizeof(transport->recv.calldir);
 | 
			
		||||
		transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	len = desc->count;
 | 
			
		||||
	if (len > transport->recv.len - transport->recv.offset)
 | 
			
		||||
		desc->count = transport->recv.len - transport->recv.offset;
 | 
			
		||||
	r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied,
 | 
			
		||||
					  desc, xdr_skb_read_bits);
 | 
			
		||||
 | 
			
		||||
	if (desc->count) {
 | 
			
		||||
		/* Error when copying to the receive buffer,
 | 
			
		||||
		 * usually because we weren't able to allocate
 | 
			
		||||
		 * additional buffer pages. All we can do now
 | 
			
		||||
		 * is turn off TCP_RCV_COPY_DATA, so the request
 | 
			
		||||
		 * will not receive any additional updates,
 | 
			
		||||
		 * and time out.
 | 
			
		||||
		 * Any remaining data from this record will
 | 
			
		||||
		 * be discarded.
 | 
			
		||||
		 */
 | 
			
		||||
		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
 | 
			
		||||
		dprintk("RPC:       XID %08x truncated request\n",
 | 
			
		||||
				ntohl(transport->recv.xid));
 | 
			
		||||
		dprintk("RPC:       xprt = %p, recv.copied = %lu, "
 | 
			
		||||
				"recv.offset = %u, recv.len = %u\n",
 | 
			
		||||
				xprt, transport->recv.copied,
 | 
			
		||||
				transport->recv.offset, transport->recv.len);
 | 
			
		||||
		return;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	transport->recv.copied += r;
 | 
			
		||||
	transport->recv.offset += r;
 | 
			
		||||
	desc->count = len - r;
 | 
			
		||||
 | 
			
		||||
	dprintk("RPC:       XID %08x read %zd bytes\n",
 | 
			
		||||
			ntohl(transport->recv.xid), r);
 | 
			
		||||
	dprintk("RPC:       xprt = %p, recv.copied = %lu, recv.offset = %u, "
 | 
			
		||||
			"recv.len = %u\n", xprt, transport->recv.copied,
 | 
			
		||||
			transport->recv.offset, transport->recv.len);
 | 
			
		||||
 | 
			
		||||
	if (transport->recv.copied == req->rq_private_buf.buflen)
 | 
			
		||||
		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
 | 
			
		||||
	else if (transport->recv.offset == transport->recv.len) {
 | 
			
		||||
		if (transport->recv.flags & TCP_RCV_LAST_FRAG)
 | 
			
		||||
			transport->recv.flags &= ~TCP_RCV_COPY_DATA;
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Finds the request corresponding to the RPC xid and invokes the common
 | 
			
		||||
 * tcp read code to read the data.
 | 
			
		||||
 */
 | 
			
		||||
static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
 | 
			
		||||
				    struct xdr_skb_reader *desc)
 | 
			
		||||
{
 | 
			
		||||
	struct sock_xprt *transport =
 | 
			
		||||
				container_of(xprt, struct sock_xprt, xprt);
 | 
			
		||||
	struct rpc_rqst *req;
 | 
			
		||||
 | 
			
		||||
	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->recv.xid));
 | 
			
		||||
 | 
			
		||||
	/* Find and lock the request corresponding to this xid */
 | 
			
		||||
	spin_lock(&xprt->queue_lock);
 | 
			
		||||
	req = xprt_lookup_rqst(xprt, transport->recv.xid);
 | 
			
		||||
	if (!req) {
 | 
			
		||||
		dprintk("RPC:       XID %08x request not found!\n",
 | 
			
		||||
				ntohl(transport->recv.xid));
 | 
			
		||||
		spin_unlock(&xprt->queue_lock);
 | 
			
		||||
		return -1;
 | 
			
		||||
	}
 | 
			
		||||
	xprt_pin_rqst(req);
 | 
			
		||||
	spin_unlock(&xprt->queue_lock);
 | 
			
		||||
 | 
			
		||||
	xs_tcp_read_common(xprt, desc, req);
 | 
			
		||||
 | 
			
		||||
	spin_lock(&xprt->queue_lock);
 | 
			
		||||
	if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
 | 
			
		||||
		xprt_complete_rqst(req->rq_task, transport->recv.copied);
 | 
			
		||||
	xprt_unpin_rqst(req);
 | 
			
		||||
	spin_unlock(&xprt->queue_lock);
 | 
			
		||||
	return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 | 
			
		||||
/*
 | 
			
		||||
 * Obtains an rpc_rqst previously allocated and invokes the common
 | 
			
		||||
 * tcp read code to read the data.  The result is placed in the callback
 | 
			
		||||
 * queue.
 | 
			
		||||
 * If we're unable to obtain the rpc_rqst we schedule the closing of the
 | 
			
		||||
 * connection and return -1.
 | 
			
		||||
 */
 | 
			
		||||
static int xs_tcp_read_callback(struct rpc_xprt *xprt,
 | 
			
		||||
				       struct xdr_skb_reader *desc)
 | 
			
		||||
{
 | 
			
		||||
	struct sock_xprt *transport =
 | 
			
		||||
				container_of(xprt, struct sock_xprt, xprt);
 | 
			
		||||
	struct rpc_rqst *req;
 | 
			
		||||
 | 
			
		||||
	/* Look up the request corresponding to the given XID */
 | 
			
		||||
	req = xprt_lookup_bc_request(xprt, transport->recv.xid);
 | 
			
		||||
	if (req == NULL) {
 | 
			
		||||
		printk(KERN_WARNING "Callback slot table overflowed\n");
 | 
			
		||||
		xprt_force_disconnect(xprt);
 | 
			
		||||
		return -1;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
 | 
			
		||||
	xs_tcp_read_common(xprt, desc, req);
 | 
			
		||||
 | 
			
		||||
	if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
 | 
			
		||||
		xprt_complete_bc_request(req, transport->recv.copied);
 | 
			
		||||
 | 
			
		||||
	return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
 | 
			
		||||
					struct xdr_skb_reader *desc)
 | 
			
		||||
{
 | 
			
		||||
	struct sock_xprt *transport =
 | 
			
		||||
				container_of(xprt, struct sock_xprt, xprt);
 | 
			
		||||
 | 
			
		||||
	return (transport->recv.flags & TCP_RPC_REPLY) ?
 | 
			
		||||
		xs_tcp_read_reply(xprt, desc) :
 | 
			
		||||
		xs_tcp_read_callback(xprt, desc);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
 | 
			
		||||
{
 | 
			
		||||
	int ret;
 | 
			
		||||
| 
						 | 
				
			
			@ -1429,106 +1496,14 @@ static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
 | 
			
		|||
{
 | 
			
		||||
	return PAGE_SIZE;
 | 
			
		||||
}
 | 
			
		||||
#else
 | 
			
		||||
static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
 | 
			
		||||
					struct xdr_skb_reader *desc)
 | 
			
		||||
{
 | 
			
		||||
	return xs_tcp_read_reply(xprt, desc);
 | 
			
		||||
}
 | 
			
		||||
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Read data off the transport.  This can be either an RPC_CALL or an
 | 
			
		||||
 * RPC_REPLY.  Relay the processing to helper functions.
 | 
			
		||||
 */
 | 
			
		||||
static void xs_tcp_read_data(struct rpc_xprt *xprt,
 | 
			
		||||
				    struct xdr_skb_reader *desc)
 | 
			
		||||
{
 | 
			
		||||
	struct sock_xprt *transport =
 | 
			
		||||
				container_of(xprt, struct sock_xprt, xprt);
 | 
			
		||||
 | 
			
		||||
	if (_xs_tcp_read_data(xprt, desc) == 0)
 | 
			
		||||
		xs_tcp_check_fraghdr(transport);
 | 
			
		||||
	else {
 | 
			
		||||
		/*
 | 
			
		||||
		 * The transport_lock protects the request handling.
 | 
			
		||||
		 * There's no need to hold it to update the recv.flags.
 | 
			
		||||
		 */
 | 
			
		||||
		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
 | 
			
		||||
{
 | 
			
		||||
	size_t len;
 | 
			
		||||
 | 
			
		||||
	len = transport->recv.len - transport->recv.offset;
 | 
			
		||||
	if (len > desc->count)
 | 
			
		||||
		len = desc->count;
 | 
			
		||||
	desc->count -= len;
 | 
			
		||||
	desc->offset += len;
 | 
			
		||||
	transport->recv.offset += len;
 | 
			
		||||
	dprintk("RPC:       discarded %zu bytes\n", len);
 | 
			
		||||
	xs_tcp_check_fraghdr(transport);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
 | 
			
		||||
{
 | 
			
		||||
	struct rpc_xprt *xprt = rd_desc->arg.data;
 | 
			
		||||
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 | 
			
		||||
	struct xdr_skb_reader desc = {
 | 
			
		||||
		.skb	= skb,
 | 
			
		||||
		.offset	= offset,
 | 
			
		||||
		.count	= len,
 | 
			
		||||
	};
 | 
			
		||||
	size_t ret;
 | 
			
		||||
 | 
			
		||||
	dprintk("RPC:       xs_tcp_data_recv started\n");
 | 
			
		||||
	do {
 | 
			
		||||
		trace_xs_tcp_data_recv(transport);
 | 
			
		||||
		/* Read in a new fragment marker if necessary */
 | 
			
		||||
		/* Can we ever really expect to get completely empty fragments? */
 | 
			
		||||
		if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) {
 | 
			
		||||
			xs_tcp_read_fraghdr(xprt, &desc);
 | 
			
		||||
			continue;
 | 
			
		||||
		}
 | 
			
		||||
		/* Read in the xid if necessary */
 | 
			
		||||
		if (transport->recv.flags & TCP_RCV_COPY_XID) {
 | 
			
		||||
			xs_tcp_read_xid(transport, &desc);
 | 
			
		||||
			continue;
 | 
			
		||||
		}
 | 
			
		||||
		/* Read in the call/reply flag */
 | 
			
		||||
		if (transport->recv.flags & TCP_RCV_READ_CALLDIR) {
 | 
			
		||||
			xs_tcp_read_calldir(transport, &desc);
 | 
			
		||||
			continue;
 | 
			
		||||
		}
 | 
			
		||||
		/* Read in the request data */
 | 
			
		||||
		if (transport->recv.flags & TCP_RCV_COPY_DATA) {
 | 
			
		||||
			xs_tcp_read_data(xprt, &desc);
 | 
			
		||||
			continue;
 | 
			
		||||
		}
 | 
			
		||||
		/* Skip over any trailing bytes on short reads */
 | 
			
		||||
		xs_tcp_read_discard(transport, &desc);
 | 
			
		||||
	} while (desc.count);
 | 
			
		||||
	ret = len - desc.count;
 | 
			
		||||
	if (ret < rd_desc->count)
 | 
			
		||||
		rd_desc->count -= ret;
 | 
			
		||||
	else
 | 
			
		||||
		rd_desc->count = 0;
 | 
			
		||||
	trace_xs_tcp_data_recv(transport);
 | 
			
		||||
	dprintk("RPC:       xs_tcp_data_recv done\n");
 | 
			
		||||
	return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void xs_tcp_data_receive(struct sock_xprt *transport)
 | 
			
		||||
{
 | 
			
		||||
	struct rpc_xprt *xprt = &transport->xprt;
 | 
			
		||||
	struct sock *sk;
 | 
			
		||||
	read_descriptor_t rd_desc = {
 | 
			
		||||
		.arg.data = xprt,
 | 
			
		||||
	};
 | 
			
		||||
	unsigned long total = 0;
 | 
			
		||||
	int read = 0;
 | 
			
		||||
	size_t read = 0;
 | 
			
		||||
	ssize_t ret = 0;
 | 
			
		||||
 | 
			
		||||
restart:
 | 
			
		||||
	mutex_lock(&transport->recv_mutex);
 | 
			
		||||
| 
						 | 
				
			
			@ -1536,18 +1511,12 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
 | 
			
		|||
	if (sk == NULL)
 | 
			
		||||
		goto out;
 | 
			
		||||
 | 
			
		||||
	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
 | 
			
		||||
	for (;;) {
 | 
			
		||||
		rd_desc.count = RPC_TCP_READ_CHUNK_SZ;
 | 
			
		||||
		lock_sock(sk);
 | 
			
		||||
		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
 | 
			
		||||
		if (rd_desc.count != 0 || read < 0) {
 | 
			
		||||
			clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
 | 
			
		||||
			release_sock(sk);
 | 
			
		||||
		clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
 | 
			
		||||
		ret = xs_read_stream(transport, MSG_DONTWAIT | MSG_NOSIGNAL);
 | 
			
		||||
		if (ret < 0)
 | 
			
		||||
			break;
 | 
			
		||||
		}
 | 
			
		||||
		release_sock(sk);
 | 
			
		||||
		total += read;
 | 
			
		||||
		read += ret;
 | 
			
		||||
		if (need_resched()) {
 | 
			
		||||
			mutex_unlock(&transport->recv_mutex);
 | 
			
		||||
			cond_resched();
 | 
			
		||||
| 
						 | 
				
			
			@ -1558,7 +1527,7 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
 | 
			
		|||
		queue_work(xprtiod_workqueue, &transport->recv_worker);
 | 
			
		||||
out:
 | 
			
		||||
	mutex_unlock(&transport->recv_mutex);
 | 
			
		||||
	trace_xs_tcp_data_ready(xprt, read, total);
 | 
			
		||||
	trace_xs_tcp_data_ready(xprt, ret, read);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void xs_tcp_data_receive_workfn(struct work_struct *work)
 | 
			
		||||
| 
						 | 
				
			
			@ -2380,7 +2349,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 | 
			
		|||
	transport->recv.offset = 0;
 | 
			
		||||
	transport->recv.len = 0;
 | 
			
		||||
	transport->recv.copied = 0;
 | 
			
		||||
	transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
 | 
			
		||||
	transport->xmit.offset = 0;
 | 
			
		||||
 | 
			
		||||
	/* Tell the socket layer to start connecting... */
 | 
			
		||||
| 
						 | 
				
			
			@ -2802,6 +2770,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
 | 
			
		|||
	.connect		= xs_connect,
 | 
			
		||||
	.buf_alloc		= rpc_malloc,
 | 
			
		||||
	.buf_free		= rpc_free,
 | 
			
		||||
	.prepare_request	= xs_stream_prepare_request,
 | 
			
		||||
	.send_request		= xs_tcp_send_request,
 | 
			
		||||
	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
 | 
			
		||||
	.close			= xs_tcp_shutdown,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in a new issue