mirror of
				https://github.com/torvalds/linux.git
				synced 2025-11-04 02:30:34 +02:00 
			
		
		
		
	rxrpc: Implement a mechanism to send an event notification to a connection
Provide a means by which an event notification can be sent to a connection through such that the I/O thread can pick it up and handle it rather than doing it in a separate workqueue. This is then used to move the deferred final ACK of a call into the I/O thread rather than a separate work queue as part of the drive to do all transmission from the I/O thread. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
This commit is contained in:
		
							parent
							
								
									03fc55adf8
								
							
						
					
					
						commit
						f2cce89a07
					
				
					 6 changed files with 55 additions and 9 deletions
				
			
		| 
						 | 
					@ -111,7 +111,7 @@
 | 
				
			||||||
	EM(rxrpc_conn_get_call_input,		"GET inp-call") \
 | 
						EM(rxrpc_conn_get_call_input,		"GET inp-call") \
 | 
				
			||||||
	EM(rxrpc_conn_get_conn_input,		"GET inp-conn") \
 | 
						EM(rxrpc_conn_get_conn_input,		"GET inp-conn") \
 | 
				
			||||||
	EM(rxrpc_conn_get_idle,			"GET idle    ") \
 | 
						EM(rxrpc_conn_get_idle,			"GET idle    ") \
 | 
				
			||||||
	EM(rxrpc_conn_get_poke,			"GET poke    ") \
 | 
						EM(rxrpc_conn_get_poke_timer,		"GET poke    ") \
 | 
				
			||||||
	EM(rxrpc_conn_get_service_conn,		"GET svc-conn") \
 | 
						EM(rxrpc_conn_get_service_conn,		"GET svc-conn") \
 | 
				
			||||||
	EM(rxrpc_conn_new_client,		"NEW client  ") \
 | 
						EM(rxrpc_conn_new_client,		"NEW client  ") \
 | 
				
			||||||
	EM(rxrpc_conn_new_service,		"NEW service ") \
 | 
						EM(rxrpc_conn_new_service,		"NEW service ") \
 | 
				
			||||||
| 
						 | 
					@ -126,10 +126,9 @@
 | 
				
			||||||
	EM(rxrpc_conn_put_service_reaped,	"PUT svc-reap") \
 | 
						EM(rxrpc_conn_put_service_reaped,	"PUT svc-reap") \
 | 
				
			||||||
	EM(rxrpc_conn_put_unbundle,		"PUT unbundle") \
 | 
						EM(rxrpc_conn_put_unbundle,		"PUT unbundle") \
 | 
				
			||||||
	EM(rxrpc_conn_put_unidle,		"PUT unidle  ") \
 | 
						EM(rxrpc_conn_put_unidle,		"PUT unidle  ") \
 | 
				
			||||||
 | 
						EM(rxrpc_conn_put_work,			"PUT work    ") \
 | 
				
			||||||
	EM(rxrpc_conn_queue_challenge,		"QUE chall   ") \
 | 
						EM(rxrpc_conn_queue_challenge,		"QUE chall   ") \
 | 
				
			||||||
	EM(rxrpc_conn_queue_retry_work,		"QUE retry-wk") \
 | 
					 | 
				
			||||||
	EM(rxrpc_conn_queue_rx_work,		"QUE rx-work ") \
 | 
						EM(rxrpc_conn_queue_rx_work,		"QUE rx-work ") \
 | 
				
			||||||
	EM(rxrpc_conn_queue_timer,		"QUE timer   ") \
 | 
					 | 
				
			||||||
	EM(rxrpc_conn_see_new_service_conn,	"SEE new-svc ") \
 | 
						EM(rxrpc_conn_see_new_service_conn,	"SEE new-svc ") \
 | 
				
			||||||
	EM(rxrpc_conn_see_reap_service,		"SEE reap-svc") \
 | 
						EM(rxrpc_conn_see_reap_service,		"SEE reap-svc") \
 | 
				
			||||||
	E_(rxrpc_conn_see_work,			"SEE work    ")
 | 
						E_(rxrpc_conn_see_work,			"SEE work    ")
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -202,6 +202,7 @@ struct rxrpc_host_header {
 | 
				
			||||||
 * - max 48 bytes (struct sk_buff::cb)
 | 
					 * - max 48 bytes (struct sk_buff::cb)
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
struct rxrpc_skb_priv {
 | 
					struct rxrpc_skb_priv {
 | 
				
			||||||
 | 
						struct rxrpc_connection *conn;	/* Connection referred to (poke packet) */
 | 
				
			||||||
	u16		offset;		/* Offset of data */
 | 
						u16		offset;		/* Offset of data */
 | 
				
			||||||
	u16		len;		/* Length of data */
 | 
						u16		len;		/* Length of data */
 | 
				
			||||||
	u8		flags;
 | 
						u8		flags;
 | 
				
			||||||
| 
						 | 
					@ -292,6 +293,7 @@ struct rxrpc_local {
 | 
				
			||||||
	struct rxrpc_sock __rcu	*service;	/* Service(s) listening on this endpoint */
 | 
						struct rxrpc_sock __rcu	*service;	/* Service(s) listening on this endpoint */
 | 
				
			||||||
	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */
 | 
						struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */
 | 
				
			||||||
	struct sk_buff_head	rx_queue;	/* Received packets */
 | 
						struct sk_buff_head	rx_queue;	/* Received packets */
 | 
				
			||||||
 | 
						struct list_head	conn_attend_q;	/* Conns requiring immediate attention */
 | 
				
			||||||
	struct list_head	call_attend_q;	/* Calls requiring immediate attention */
 | 
						struct list_head	call_attend_q;	/* Calls requiring immediate attention */
 | 
				
			||||||
	struct rb_root		client_bundles;	/* Client connection bundles by socket params */
 | 
						struct rb_root		client_bundles;	/* Client connection bundles by socket params */
 | 
				
			||||||
	spinlock_t		client_bundles_lock; /* Lock for client_bundles */
 | 
						spinlock_t		client_bundles_lock; /* Lock for client_bundles */
 | 
				
			||||||
| 
						 | 
					@ -441,6 +443,7 @@ struct rxrpc_connection {
 | 
				
			||||||
	struct rxrpc_peer	*peer;		/* Remote endpoint */
 | 
						struct rxrpc_peer	*peer;		/* Remote endpoint */
 | 
				
			||||||
	struct rxrpc_net	*rxnet;		/* Network namespace to which call belongs */
 | 
						struct rxrpc_net	*rxnet;		/* Network namespace to which call belongs */
 | 
				
			||||||
	struct key		*key;		/* Security details */
 | 
						struct key		*key;		/* Security details */
 | 
				
			||||||
 | 
						struct list_head	attend_link;	/* Link in local->conn_attend_q */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	refcount_t		ref;
 | 
						refcount_t		ref;
 | 
				
			||||||
	atomic_t		active;		/* Active count for service conns */
 | 
						atomic_t		active;		/* Active count for service conns */
 | 
				
			||||||
| 
						 | 
					@ -905,6 +908,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, struct sk_buff *s
 | 
				
			||||||
void rxrpc_process_connection(struct work_struct *);
 | 
					void rxrpc_process_connection(struct work_struct *);
 | 
				
			||||||
void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool);
 | 
					void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool);
 | 
				
			||||||
int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
 | 
					int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
 | 
				
			||||||
 | 
					void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/*
 | 
					/*
 | 
				
			||||||
 * conn_object.c
 | 
					 * conn_object.c
 | 
				
			||||||
| 
						 | 
					@ -912,6 +916,7 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
 | 
				
			||||||
extern unsigned int rxrpc_connection_expiry;
 | 
					extern unsigned int rxrpc_connection_expiry;
 | 
				
			||||||
extern unsigned int rxrpc_closed_conn_expiry;
 | 
					extern unsigned int rxrpc_closed_conn_expiry;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why);
 | 
				
			||||||
struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t);
 | 
					struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t);
 | 
				
			||||||
struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *,
 | 
					struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *,
 | 
				
			||||||
							  struct sockaddr_rxrpc *,
 | 
												  struct sockaddr_rxrpc *,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -412,10 +412,6 @@ static void rxrpc_do_process_connection(struct rxrpc_connection *conn)
 | 
				
			||||||
	if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events))
 | 
						if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events))
 | 
				
			||||||
		rxrpc_secure_connection(conn);
 | 
							rxrpc_secure_connection(conn);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* Process delayed ACKs whose time has come. */
 | 
					 | 
				
			||||||
	if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
 | 
					 | 
				
			||||||
		rxrpc_process_delayed_final_acks(conn, false);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	/* go through the conn-level event packets, releasing the ref on this
 | 
						/* go through the conn-level event packets, releasing the ref on this
 | 
				
			||||||
	 * connection that each one has when we've finished with it */
 | 
						 * connection that each one has when we've finished with it */
 | 
				
			||||||
	while ((skb = skb_dequeue(&conn->rx_queue))) {
 | 
						while ((skb = skb_dequeue(&conn->rx_queue))) {
 | 
				
			||||||
| 
						 | 
					@ -515,3 +511,13 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
 | 
				
			||||||
		return -EPROTO;
 | 
							return -EPROTO;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * Input a connection event.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						/* Process delayed ACKs whose time has come. */
 | 
				
			||||||
 | 
						if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
 | 
				
			||||||
 | 
							rxrpc_process_delayed_final_acks(conn, false);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -23,12 +23,30 @@ static void rxrpc_clean_up_connection(struct work_struct *work);
 | 
				
			||||||
static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet,
 | 
					static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet,
 | 
				
			||||||
					 unsigned long reap_at);
 | 
										 unsigned long reap_at);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						struct rxrpc_local *local = conn->local;
 | 
				
			||||||
 | 
						bool busy;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (WARN_ON_ONCE(!local))
 | 
				
			||||||
 | 
							return;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						spin_lock_bh(&local->lock);
 | 
				
			||||||
 | 
						busy = !list_empty(&conn->attend_link);
 | 
				
			||||||
 | 
						if (!busy) {
 | 
				
			||||||
 | 
							rxrpc_get_connection(conn, why);
 | 
				
			||||||
 | 
							list_add_tail(&conn->attend_link, &local->conn_attend_q);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						spin_unlock_bh(&local->lock);
 | 
				
			||||||
 | 
						rxrpc_wake_up_io_thread(local);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void rxrpc_connection_timer(struct timer_list *timer)
 | 
					static void rxrpc_connection_timer(struct timer_list *timer)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct rxrpc_connection *conn =
 | 
						struct rxrpc_connection *conn =
 | 
				
			||||||
		container_of(timer, struct rxrpc_connection, timer);
 | 
							container_of(timer, struct rxrpc_connection, timer);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	rxrpc_queue_conn(conn, rxrpc_conn_queue_timer);
 | 
						rxrpc_poke_conn(conn, rxrpc_conn_get_poke_timer);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/*
 | 
					/*
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -421,6 +421,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
int rxrpc_io_thread(void *data)
 | 
					int rxrpc_io_thread(void *data)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
						struct rxrpc_connection *conn;
 | 
				
			||||||
	struct sk_buff_head rx_queue;
 | 
						struct sk_buff_head rx_queue;
 | 
				
			||||||
	struct rxrpc_local *local = data;
 | 
						struct rxrpc_local *local = data;
 | 
				
			||||||
	struct rxrpc_call *call;
 | 
						struct rxrpc_call *call;
 | 
				
			||||||
| 
						 | 
					@ -436,6 +437,20 @@ int rxrpc_io_thread(void *data)
 | 
				
			||||||
	for (;;) {
 | 
						for (;;) {
 | 
				
			||||||
		rxrpc_inc_stat(local->rxnet, stat_io_loop);
 | 
							rxrpc_inc_stat(local->rxnet, stat_io_loop);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							/* Deal with connections that want immediate attention. */
 | 
				
			||||||
 | 
							conn = list_first_entry_or_null(&local->conn_attend_q,
 | 
				
			||||||
 | 
											struct rxrpc_connection,
 | 
				
			||||||
 | 
											attend_link);
 | 
				
			||||||
 | 
							if (conn) {
 | 
				
			||||||
 | 
								spin_lock_bh(&local->lock);
 | 
				
			||||||
 | 
								list_del_init(&conn->attend_link);
 | 
				
			||||||
 | 
								spin_unlock_bh(&local->lock);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								rxrpc_input_conn_event(conn, NULL);
 | 
				
			||||||
 | 
								rxrpc_put_connection(conn, rxrpc_conn_put_poke);
 | 
				
			||||||
 | 
								continue;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		/* Deal with calls that want immediate attention. */
 | 
							/* Deal with calls that want immediate attention. */
 | 
				
			||||||
		if ((call = list_first_entry_or_null(&local->call_attend_q,
 | 
							if ((call = list_first_entry_or_null(&local->call_attend_q,
 | 
				
			||||||
						     struct rxrpc_call,
 | 
											     struct rxrpc_call,
 | 
				
			||||||
| 
						 | 
					@ -463,6 +478,7 @@ int rxrpc_io_thread(void *data)
 | 
				
			||||||
				rxrpc_input_error(local, skb);
 | 
									rxrpc_input_error(local, skb);
 | 
				
			||||||
				rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
 | 
									rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
 | 
				
			||||||
				break;
 | 
									break;
 | 
				
			||||||
 | 
									break;
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
				WARN_ON_ONCE(1);
 | 
									WARN_ON_ONCE(1);
 | 
				
			||||||
				rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
 | 
									rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
 | 
				
			||||||
| 
						 | 
					@ -481,7 +497,8 @@ int rxrpc_io_thread(void *data)
 | 
				
			||||||
		set_current_state(TASK_INTERRUPTIBLE);
 | 
							set_current_state(TASK_INTERRUPTIBLE);
 | 
				
			||||||
		should_stop = kthread_should_stop();
 | 
							should_stop = kthread_should_stop();
 | 
				
			||||||
		if (!skb_queue_empty(&local->rx_queue) ||
 | 
							if (!skb_queue_empty(&local->rx_queue) ||
 | 
				
			||||||
		    !list_empty(&local->call_attend_q)) {
 | 
							    !list_empty(&local->call_attend_q) ||
 | 
				
			||||||
 | 
							    !list_empty(&local->conn_attend_q)) {
 | 
				
			||||||
			__set_current_state(TASK_RUNNING);
 | 
								__set_current_state(TASK_RUNNING);
 | 
				
			||||||
			continue;
 | 
								continue;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -100,6 +100,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
 | 
				
			||||||
		init_rwsem(&local->defrag_sem);
 | 
							init_rwsem(&local->defrag_sem);
 | 
				
			||||||
		init_completion(&local->io_thread_ready);
 | 
							init_completion(&local->io_thread_ready);
 | 
				
			||||||
		skb_queue_head_init(&local->rx_queue);
 | 
							skb_queue_head_init(&local->rx_queue);
 | 
				
			||||||
 | 
							INIT_LIST_HEAD(&local->conn_attend_q);
 | 
				
			||||||
		INIT_LIST_HEAD(&local->call_attend_q);
 | 
							INIT_LIST_HEAD(&local->call_attend_q);
 | 
				
			||||||
		local->client_bundles = RB_ROOT;
 | 
							local->client_bundles = RB_ROOT;
 | 
				
			||||||
		spin_lock_init(&local->client_bundles_lock);
 | 
							spin_lock_init(&local->client_bundles_lock);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in a new issue