forked from mirrors/linux
		
	rxrpc: Implement a mechanism to send an event notification to a call
Provide a means by which an event notification can be sent to a call such that the I/O thread can process it rather than it being done in a separate workqueue. This will allow a lot of locking to be removed. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
This commit is contained in:
		
							parent
							
								
									81f2e8adc0
								
							
						
					
					
						commit
						15f661dc95
					
				
					 6 changed files with 100 additions and 5 deletions
				
			
		| 
						 | 
				
			
			@ -16,6 +16,13 @@
 | 
			
		|||
/*
 | 
			
		||||
 * Declare tracing information enums and their string mappings for display.
 | 
			
		||||
 */
 | 
			
		||||
#define rxrpc_call_poke_traces \
 | 
			
		||||
	EM(rxrpc_call_poke_error,		"Error")	\
 | 
			
		||||
	EM(rxrpc_call_poke_idle,		"Idle")		\
 | 
			
		||||
	EM(rxrpc_call_poke_start,		"Start")	\
 | 
			
		||||
	EM(rxrpc_call_poke_timer,		"Timer")	\
 | 
			
		||||
	E_(rxrpc_call_poke_timer_now,		"Timer-now")
 | 
			
		||||
 | 
			
		||||
#define rxrpc_skb_traces \
 | 
			
		||||
	EM(rxrpc_skb_eaten_by_unshare,		"ETN unshare  ") \
 | 
			
		||||
	EM(rxrpc_skb_eaten_by_unshare_nomem,	"ETN unshar-nm") \
 | 
			
		||||
| 
						 | 
				
			
			@ -150,6 +157,7 @@
 | 
			
		|||
	EM(rxrpc_call_get_input,		"GET input   ") \
 | 
			
		||||
	EM(rxrpc_call_get_kernel_service,	"GET krnl-srv") \
 | 
			
		||||
	EM(rxrpc_call_get_notify_socket,	"GET notify  ") \
 | 
			
		||||
	EM(rxrpc_call_get_poke,			"GET poke    ") \
 | 
			
		||||
	EM(rxrpc_call_get_recvmsg,		"GET recvmsg ") \
 | 
			
		||||
	EM(rxrpc_call_get_release_sock,		"GET rel-sock") \
 | 
			
		||||
	EM(rxrpc_call_get_sendmsg,		"GET sendmsg ") \
 | 
			
		||||
| 
						 | 
				
			
			@ -160,6 +168,7 @@
 | 
			
		|||
	EM(rxrpc_call_put_discard_prealloc,	"PUT disc-pre") \
 | 
			
		||||
	EM(rxrpc_call_put_input,		"PUT input   ") \
 | 
			
		||||
	EM(rxrpc_call_put_kernel,		"PUT kernel  ") \
 | 
			
		||||
	EM(rxrpc_call_put_poke,			"PUT poke    ") \
 | 
			
		||||
	EM(rxrpc_call_put_recvmsg,		"PUT recvmsg ") \
 | 
			
		||||
	EM(rxrpc_call_put_release_sock,		"PUT rls-sock") \
 | 
			
		||||
	EM(rxrpc_call_put_release_sock_tba,	"PUT rls-sk-a") \
 | 
			
		||||
| 
						 | 
				
			
			@ -378,6 +387,7 @@
 | 
			
		|||
#define E_(a, b) a
 | 
			
		||||
 | 
			
		||||
enum rxrpc_bundle_trace		{ rxrpc_bundle_traces } __mode(byte);
 | 
			
		||||
enum rxrpc_call_poke_trace	{ rxrpc_call_poke_traces } __mode(byte);
 | 
			
		||||
enum rxrpc_call_trace		{ rxrpc_call_traces } __mode(byte);
 | 
			
		||||
enum rxrpc_client_trace		{ rxrpc_client_traces } __mode(byte);
 | 
			
		||||
enum rxrpc_congest_change	{ rxrpc_congest_changes } __mode(byte);
 | 
			
		||||
| 
						 | 
				
			
			@ -408,6 +418,7 @@ enum rxrpc_txqueue_trace	{ rxrpc_txqueue_traces } __mode(byte);
 | 
			
		|||
#define E_(a, b) TRACE_DEFINE_ENUM(a);
 | 
			
		||||
 | 
			
		||||
rxrpc_bundle_traces;
 | 
			
		||||
rxrpc_call_poke_traces;
 | 
			
		||||
rxrpc_call_traces;
 | 
			
		||||
rxrpc_client_traces;
 | 
			
		||||
rxrpc_congest_changes;
 | 
			
		||||
| 
						 | 
				
			
			@ -1747,6 +1758,47 @@ TRACE_EVENT(rxrpc_txbuf,
 | 
			
		|||
		      __entry->ref)
 | 
			
		||||
	    );
 | 
			
		||||
 | 
			
		||||
TRACE_EVENT(rxrpc_poke_call,
 | 
			
		||||
	    TP_PROTO(struct rxrpc_call *call, bool busy,
 | 
			
		||||
		     enum rxrpc_call_poke_trace what),
 | 
			
		||||
 | 
			
		||||
	    TP_ARGS(call, busy, what),
 | 
			
		||||
 | 
			
		||||
	    TP_STRUCT__entry(
 | 
			
		||||
		    __field(unsigned int,		call_debug_id	)
 | 
			
		||||
		    __field(bool,			busy		)
 | 
			
		||||
		    __field(enum rxrpc_call_poke_trace,	what		)
 | 
			
		||||
			     ),
 | 
			
		||||
 | 
			
		||||
	    TP_fast_assign(
 | 
			
		||||
		    __entry->call_debug_id = call->debug_id;
 | 
			
		||||
		    __entry->busy = busy;
 | 
			
		||||
		    __entry->what = what;
 | 
			
		||||
			   ),
 | 
			
		||||
 | 
			
		||||
	    TP_printk("c=%08x %s%s",
 | 
			
		||||
		      __entry->call_debug_id,
 | 
			
		||||
		      __print_symbolic(__entry->what, rxrpc_call_poke_traces),
 | 
			
		||||
		      __entry->busy ? "!" : "")
 | 
			
		||||
	    );
 | 
			
		||||
 | 
			
		||||
TRACE_EVENT(rxrpc_call_poked,
 | 
			
		||||
	    TP_PROTO(struct rxrpc_call *call),
 | 
			
		||||
 | 
			
		||||
	    TP_ARGS(call),
 | 
			
		||||
 | 
			
		||||
	    TP_STRUCT__entry(
 | 
			
		||||
		    __field(unsigned int,		call_debug_id	)
 | 
			
		||||
			     ),
 | 
			
		||||
 | 
			
		||||
	    TP_fast_assign(
 | 
			
		||||
		    __entry->call_debug_id = call->debug_id;
 | 
			
		||||
			   ),
 | 
			
		||||
 | 
			
		||||
	    TP_printk("c=%08x",
 | 
			
		||||
		      __entry->call_debug_id)
 | 
			
		||||
	    );
 | 
			
		||||
 | 
			
		||||
#undef EM
 | 
			
		||||
#undef E_
 | 
			
		||||
#endif /* _TRACE_RXRPC_H */
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -292,6 +292,7 @@ struct rxrpc_local {
 | 
			
		|||
	struct sk_buff_head	reject_queue;	/* packets awaiting rejection */
 | 
			
		||||
	struct sk_buff_head	event_queue;	/* endpoint event packets awaiting processing */
 | 
			
		||||
	struct sk_buff_head	rx_queue;	/* Received packets */
 | 
			
		||||
	struct list_head	call_attend_q;	/* Calls requiring immediate attention */
 | 
			
		||||
	struct rb_root		client_bundles;	/* Client connection bundles by socket params */
 | 
			
		||||
	spinlock_t		client_bundles_lock; /* Lock for client_bundles */
 | 
			
		||||
	spinlock_t		lock;		/* access lock */
 | 
			
		||||
| 
						 | 
				
			
			@ -616,6 +617,7 @@ struct rxrpc_call {
 | 
			
		|||
	struct list_head	recvmsg_link;	/* Link in rx->recvmsg_q */
 | 
			
		||||
	struct list_head	sock_link;	/* Link in rx->sock_calls */
 | 
			
		||||
	struct rb_node		sock_node;	/* Node in rx->calls */
 | 
			
		||||
	struct list_head	attend_link;	/* Link in local->call_attend_q */
 | 
			
		||||
	struct rxrpc_txbuf	*tx_pending;	/* Tx buffer being filled */
 | 
			
		||||
	wait_queue_head_t	waitq;		/* Wait queue for channel or Tx */
 | 
			
		||||
	s64			tx_total_len;	/* Total length left to be transmitted (or -1) */
 | 
			
		||||
| 
						 | 
				
			
			@ -843,6 +845,7 @@ extern const char *const rxrpc_call_states[];
 | 
			
		|||
extern const char *const rxrpc_call_completions[];
 | 
			
		||||
extern struct kmem_cache *rxrpc_call_jar;
 | 
			
		||||
 | 
			
		||||
void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what);
 | 
			
		||||
struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long);
 | 
			
		||||
struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *, gfp_t, unsigned int);
 | 
			
		||||
struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
 | 
			
		||||
| 
						 | 
				
			
			@ -951,7 +954,7 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
 | 
			
		|||
/*
 | 
			
		||||
 * input.c
 | 
			
		||||
 */
 | 
			
		||||
void rxrpc_input_call_packet(struct rxrpc_call *, struct sk_buff *);
 | 
			
		||||
void rxrpc_input_call_event(struct rxrpc_call *, struct sk_buff *);
 | 
			
		||||
void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection *,
 | 
			
		||||
				   struct rxrpc_call *);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -45,6 +45,29 @@ static struct semaphore rxrpc_call_limiter =
 | 
			
		|||
static struct semaphore rxrpc_kernel_call_limiter =
 | 
			
		||||
	__SEMAPHORE_INITIALIZER(rxrpc_kernel_call_limiter, 1000);
 | 
			
		||||
 | 
			
		||||
void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
 | 
			
		||||
{
 | 
			
		||||
	struct rxrpc_local *local;
 | 
			
		||||
	struct rxrpc_peer *peer = call->peer;
 | 
			
		||||
	bool busy;
 | 
			
		||||
 | 
			
		||||
	if (WARN_ON_ONCE(!peer))
 | 
			
		||||
		return;
 | 
			
		||||
	local = peer->local;
 | 
			
		||||
 | 
			
		||||
	if (call->state < RXRPC_CALL_COMPLETE) {
 | 
			
		||||
		spin_lock_bh(&local->lock);
 | 
			
		||||
		busy = !list_empty(&call->attend_link);
 | 
			
		||||
		trace_rxrpc_poke_call(call, busy, what);
 | 
			
		||||
		if (!busy) {
 | 
			
		||||
			rxrpc_get_call(call, rxrpc_call_get_poke);
 | 
			
		||||
			list_add_tail(&call->attend_link, &local->call_attend_q);
 | 
			
		||||
		}
 | 
			
		||||
		spin_unlock_bh(&local->lock);
 | 
			
		||||
		rxrpc_wake_up_io_thread(local);
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void rxrpc_call_timer_expired(struct timer_list *t)
 | 
			
		||||
{
 | 
			
		||||
	struct rxrpc_call *call = from_timer(call, t, timer);
 | 
			
		||||
| 
						 | 
				
			
			@ -137,6 +160,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 | 
			
		|||
	INIT_LIST_HEAD(&call->accept_link);
 | 
			
		||||
	INIT_LIST_HEAD(&call->recvmsg_link);
 | 
			
		||||
	INIT_LIST_HEAD(&call->sock_link);
 | 
			
		||||
	INIT_LIST_HEAD(&call->attend_link);
 | 
			
		||||
	INIT_LIST_HEAD(&call->tx_buffer);
 | 
			
		||||
	skb_queue_head_init(&call->recvmsg_queue);
 | 
			
		||||
	skb_queue_head_init(&call->rx_oos_queue);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1017,8 +1017,7 @@ static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb)
 | 
			
		|||
/*
 | 
			
		||||
 * Process an incoming call packet.
 | 
			
		||||
 */
 | 
			
		||||
void rxrpc_input_call_packet(struct rxrpc_call *call,
 | 
			
		||||
				    struct sk_buff *skb)
 | 
			
		||||
void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
 | 
			
		||||
{
 | 
			
		||||
	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 | 
			
		||||
	unsigned long timo;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -366,7 +366,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff *skb)
 | 
			
		|||
	/* Process a call packet; this either discards or passes on the ref
 | 
			
		||||
	 * elsewhere.
 | 
			
		||||
	 */
 | 
			
		||||
	rxrpc_input_call_packet(call, skb);
 | 
			
		||||
	rxrpc_input_call_event(call, skb);
 | 
			
		||||
	goto out;
 | 
			
		||||
 | 
			
		||||
discard:
 | 
			
		||||
| 
						 | 
				
			
			@ -413,6 +413,7 @@ int rxrpc_io_thread(void *data)
 | 
			
		|||
{
 | 
			
		||||
	struct sk_buff_head rx_queue;
 | 
			
		||||
	struct rxrpc_local *local = data;
 | 
			
		||||
	struct rxrpc_call *call;
 | 
			
		||||
	struct sk_buff *skb;
 | 
			
		||||
 | 
			
		||||
	skb_queue_head_init(&rx_queue);
 | 
			
		||||
| 
						 | 
				
			
			@ -422,6 +423,20 @@ int rxrpc_io_thread(void *data)
 | 
			
		|||
	for (;;) {
 | 
			
		||||
		rxrpc_inc_stat(local->rxnet, stat_io_loop);
 | 
			
		||||
 | 
			
		||||
		/* Deal with calls that want immediate attention. */
 | 
			
		||||
		if ((call = list_first_entry_or_null(&local->call_attend_q,
 | 
			
		||||
						     struct rxrpc_call,
 | 
			
		||||
						     attend_link))) {
 | 
			
		||||
			spin_lock_bh(&local->lock);
 | 
			
		||||
			list_del_init(&call->attend_link);
 | 
			
		||||
			spin_unlock_bh(&local->lock);
 | 
			
		||||
 | 
			
		||||
			trace_rxrpc_call_poked(call);
 | 
			
		||||
			rxrpc_input_call_event(call, NULL);
 | 
			
		||||
			rxrpc_put_call(call, rxrpc_call_put_poke);
 | 
			
		||||
			continue;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		/* Process received packets and errors. */
 | 
			
		||||
		if ((skb = __skb_dequeue(&rx_queue))) {
 | 
			
		||||
			switch (skb->mark) {
 | 
			
		||||
| 
						 | 
				
			
			@ -450,7 +465,8 @@ int rxrpc_io_thread(void *data)
 | 
			
		|||
		}
 | 
			
		||||
 | 
			
		||||
		set_current_state(TASK_INTERRUPTIBLE);
 | 
			
		||||
		if (!skb_queue_empty(&local->rx_queue)) {
 | 
			
		||||
		if (!skb_queue_empty(&local->rx_queue) ||
 | 
			
		||||
		    !list_empty(&local->call_attend_q)) {
 | 
			
		||||
			__set_current_state(TASK_RUNNING);
 | 
			
		||||
			continue;
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -104,6 +104,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
 | 
			
		|||
		skb_queue_head_init(&local->reject_queue);
 | 
			
		||||
		skb_queue_head_init(&local->event_queue);
 | 
			
		||||
		skb_queue_head_init(&local->rx_queue);
 | 
			
		||||
		INIT_LIST_HEAD(&local->call_attend_q);
 | 
			
		||||
		local->client_bundles = RB_ROOT;
 | 
			
		||||
		spin_lock_init(&local->client_bundles_lock);
 | 
			
		||||
		spin_lock_init(&local->lock);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in a new issue