forked from mirrors/linux
		
	rxrpc: Implement a mechanism to send an event notification to a connection
Provide a means by which an event notification can be sent to a connection through such that the I/O thread can pick it up and handle it rather than doing it in a separate workqueue. This is then used to move the deferred final ACK of a call into the I/O thread rather than a separate work queue as part of the drive to do all transmission from the I/O thread. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
This commit is contained in:
		
							parent
							
								
									03fc55adf8
								
							
						
					
					
						commit
						f2cce89a07
					
				
					 6 changed files with 55 additions and 9 deletions
				
			
		|  | @ -111,7 +111,7 @@ | ||||||
| 	EM(rxrpc_conn_get_call_input,		"GET inp-call") \ | 	EM(rxrpc_conn_get_call_input,		"GET inp-call") \ | ||||||
| 	EM(rxrpc_conn_get_conn_input,		"GET inp-conn") \ | 	EM(rxrpc_conn_get_conn_input,		"GET inp-conn") \ | ||||||
| 	EM(rxrpc_conn_get_idle,			"GET idle    ") \ | 	EM(rxrpc_conn_get_idle,			"GET idle    ") \ | ||||||
| 	EM(rxrpc_conn_get_poke,			"GET poke    ") \ | 	EM(rxrpc_conn_get_poke_timer,		"GET poke    ") \ | ||||||
| 	EM(rxrpc_conn_get_service_conn,		"GET svc-conn") \ | 	EM(rxrpc_conn_get_service_conn,		"GET svc-conn") \ | ||||||
| 	EM(rxrpc_conn_new_client,		"NEW client  ") \ | 	EM(rxrpc_conn_new_client,		"NEW client  ") \ | ||||||
| 	EM(rxrpc_conn_new_service,		"NEW service ") \ | 	EM(rxrpc_conn_new_service,		"NEW service ") \ | ||||||
|  | @ -126,10 +126,9 @@ | ||||||
| 	EM(rxrpc_conn_put_service_reaped,	"PUT svc-reap") \ | 	EM(rxrpc_conn_put_service_reaped,	"PUT svc-reap") \ | ||||||
| 	EM(rxrpc_conn_put_unbundle,		"PUT unbundle") \ | 	EM(rxrpc_conn_put_unbundle,		"PUT unbundle") \ | ||||||
| 	EM(rxrpc_conn_put_unidle,		"PUT unidle  ") \ | 	EM(rxrpc_conn_put_unidle,		"PUT unidle  ") \ | ||||||
|  | 	EM(rxrpc_conn_put_work,			"PUT work    ") \ | ||||||
| 	EM(rxrpc_conn_queue_challenge,		"QUE chall   ") \ | 	EM(rxrpc_conn_queue_challenge,		"QUE chall   ") \ | ||||||
| 	EM(rxrpc_conn_queue_retry_work,		"QUE retry-wk") \ |  | ||||||
| 	EM(rxrpc_conn_queue_rx_work,		"QUE rx-work ") \ | 	EM(rxrpc_conn_queue_rx_work,		"QUE rx-work ") \ | ||||||
| 	EM(rxrpc_conn_queue_timer,		"QUE timer   ") \ |  | ||||||
| 	EM(rxrpc_conn_see_new_service_conn,	"SEE new-svc ") \ | 	EM(rxrpc_conn_see_new_service_conn,	"SEE new-svc ") \ | ||||||
| 	EM(rxrpc_conn_see_reap_service,		"SEE reap-svc") \ | 	EM(rxrpc_conn_see_reap_service,		"SEE reap-svc") \ | ||||||
| 	E_(rxrpc_conn_see_work,			"SEE work    ") | 	E_(rxrpc_conn_see_work,			"SEE work    ") | ||||||
|  |  | ||||||
|  | @ -202,6 +202,7 @@ struct rxrpc_host_header { | ||||||
|  * - max 48 bytes (struct sk_buff::cb) |  * - max 48 bytes (struct sk_buff::cb) | ||||||
|  */ |  */ | ||||||
| struct rxrpc_skb_priv { | struct rxrpc_skb_priv { | ||||||
|  | 	struct rxrpc_connection *conn;	/* Connection referred to (poke packet) */ | ||||||
| 	u16		offset;		/* Offset of data */ | 	u16		offset;		/* Offset of data */ | ||||||
| 	u16		len;		/* Length of data */ | 	u16		len;		/* Length of data */ | ||||||
| 	u8		flags; | 	u8		flags; | ||||||
|  | @ -292,6 +293,7 @@ struct rxrpc_local { | ||||||
| 	struct rxrpc_sock __rcu	*service;	/* Service(s) listening on this endpoint */ | 	struct rxrpc_sock __rcu	*service;	/* Service(s) listening on this endpoint */ | ||||||
| 	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */ | 	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */ | ||||||
| 	struct sk_buff_head	rx_queue;	/* Received packets */ | 	struct sk_buff_head	rx_queue;	/* Received packets */ | ||||||
|  | 	struct list_head	conn_attend_q;	/* Conns requiring immediate attention */ | ||||||
| 	struct list_head	call_attend_q;	/* Calls requiring immediate attention */ | 	struct list_head	call_attend_q;	/* Calls requiring immediate attention */ | ||||||
| 	struct rb_root		client_bundles;	/* Client connection bundles by socket params */ | 	struct rb_root		client_bundles;	/* Client connection bundles by socket params */ | ||||||
| 	spinlock_t		client_bundles_lock; /* Lock for client_bundles */ | 	spinlock_t		client_bundles_lock; /* Lock for client_bundles */ | ||||||
|  | @ -441,6 +443,7 @@ struct rxrpc_connection { | ||||||
| 	struct rxrpc_peer	*peer;		/* Remote endpoint */ | 	struct rxrpc_peer	*peer;		/* Remote endpoint */ | ||||||
| 	struct rxrpc_net	*rxnet;		/* Network namespace to which call belongs */ | 	struct rxrpc_net	*rxnet;		/* Network namespace to which call belongs */ | ||||||
| 	struct key		*key;		/* Security details */ | 	struct key		*key;		/* Security details */ | ||||||
|  | 	struct list_head	attend_link;	/* Link in local->conn_attend_q */ | ||||||
| 
 | 
 | ||||||
| 	refcount_t		ref; | 	refcount_t		ref; | ||||||
| 	atomic_t		active;		/* Active count for service conns */ | 	atomic_t		active;		/* Active count for service conns */ | ||||||
|  | @ -905,6 +908,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, struct sk_buff *s | ||||||
| void rxrpc_process_connection(struct work_struct *); | void rxrpc_process_connection(struct work_struct *); | ||||||
| void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool); | void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool); | ||||||
| int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb); | int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb); | ||||||
|  | void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb); | ||||||
| 
 | 
 | ||||||
| /*
 | /*
 | ||||||
|  * conn_object.c |  * conn_object.c | ||||||
|  | @ -912,6 +916,7 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb); | ||||||
| extern unsigned int rxrpc_connection_expiry; | extern unsigned int rxrpc_connection_expiry; | ||||||
| extern unsigned int rxrpc_closed_conn_expiry; | extern unsigned int rxrpc_closed_conn_expiry; | ||||||
| 
 | 
 | ||||||
|  | void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why); | ||||||
| struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t); | struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t); | ||||||
| struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *, | struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *, | ||||||
| 							  struct sockaddr_rxrpc *, | 							  struct sockaddr_rxrpc *, | ||||||
|  |  | ||||||
|  | @ -412,10 +412,6 @@ static void rxrpc_do_process_connection(struct rxrpc_connection *conn) | ||||||
| 	if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events)) | 	if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events)) | ||||||
| 		rxrpc_secure_connection(conn); | 		rxrpc_secure_connection(conn); | ||||||
| 
 | 
 | ||||||
| 	/* Process delayed ACKs whose time has come. */ |  | ||||||
| 	if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK) |  | ||||||
| 		rxrpc_process_delayed_final_acks(conn, false); |  | ||||||
| 
 |  | ||||||
| 	/* go through the conn-level event packets, releasing the ref on this
 | 	/* go through the conn-level event packets, releasing the ref on this
 | ||||||
| 	 * connection that each one has when we've finished with it */ | 	 * connection that each one has when we've finished with it */ | ||||||
| 	while ((skb = skb_dequeue(&conn->rx_queue))) { | 	while ((skb = skb_dequeue(&conn->rx_queue))) { | ||||||
|  | @ -515,3 +511,13 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb) | ||||||
| 		return -EPROTO; | 		return -EPROTO; | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | /*
 | ||||||
|  |  * Input a connection event. | ||||||
|  |  */ | ||||||
|  | void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb) | ||||||
|  | { | ||||||
|  | 	/* Process delayed ACKs whose time has come. */ | ||||||
|  | 	if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK) | ||||||
|  | 		rxrpc_process_delayed_final_acks(conn, false); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -23,12 +23,30 @@ static void rxrpc_clean_up_connection(struct work_struct *work); | ||||||
| static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet, | static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet, | ||||||
| 					 unsigned long reap_at); | 					 unsigned long reap_at); | ||||||
| 
 | 
 | ||||||
|  | void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why) | ||||||
|  | { | ||||||
|  | 	struct rxrpc_local *local = conn->local; | ||||||
|  | 	bool busy; | ||||||
|  | 
 | ||||||
|  | 	if (WARN_ON_ONCE(!local)) | ||||||
|  | 		return; | ||||||
|  | 
 | ||||||
|  | 	spin_lock_bh(&local->lock); | ||||||
|  | 	busy = !list_empty(&conn->attend_link); | ||||||
|  | 	if (!busy) { | ||||||
|  | 		rxrpc_get_connection(conn, why); | ||||||
|  | 		list_add_tail(&conn->attend_link, &local->conn_attend_q); | ||||||
|  | 	} | ||||||
|  | 	spin_unlock_bh(&local->lock); | ||||||
|  | 	rxrpc_wake_up_io_thread(local); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| static void rxrpc_connection_timer(struct timer_list *timer) | static void rxrpc_connection_timer(struct timer_list *timer) | ||||||
| { | { | ||||||
| 	struct rxrpc_connection *conn = | 	struct rxrpc_connection *conn = | ||||||
| 		container_of(timer, struct rxrpc_connection, timer); | 		container_of(timer, struct rxrpc_connection, timer); | ||||||
| 
 | 
 | ||||||
| 	rxrpc_queue_conn(conn, rxrpc_conn_queue_timer); | 	rxrpc_poke_conn(conn, rxrpc_conn_get_poke_timer); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /*
 | /*
 | ||||||
|  |  | ||||||
|  | @ -421,6 +421,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, | ||||||
|  */ |  */ | ||||||
| int rxrpc_io_thread(void *data) | int rxrpc_io_thread(void *data) | ||||||
| { | { | ||||||
|  | 	struct rxrpc_connection *conn; | ||||||
| 	struct sk_buff_head rx_queue; | 	struct sk_buff_head rx_queue; | ||||||
| 	struct rxrpc_local *local = data; | 	struct rxrpc_local *local = data; | ||||||
| 	struct rxrpc_call *call; | 	struct rxrpc_call *call; | ||||||
|  | @ -436,6 +437,20 @@ int rxrpc_io_thread(void *data) | ||||||
| 	for (;;) { | 	for (;;) { | ||||||
| 		rxrpc_inc_stat(local->rxnet, stat_io_loop); | 		rxrpc_inc_stat(local->rxnet, stat_io_loop); | ||||||
| 
 | 
 | ||||||
|  | 		/* Deal with connections that want immediate attention. */ | ||||||
|  | 		conn = list_first_entry_or_null(&local->conn_attend_q, | ||||||
|  | 						struct rxrpc_connection, | ||||||
|  | 						attend_link); | ||||||
|  | 		if (conn) { | ||||||
|  | 			spin_lock_bh(&local->lock); | ||||||
|  | 			list_del_init(&conn->attend_link); | ||||||
|  | 			spin_unlock_bh(&local->lock); | ||||||
|  | 
 | ||||||
|  | 			rxrpc_input_conn_event(conn, NULL); | ||||||
|  | 			rxrpc_put_connection(conn, rxrpc_conn_put_poke); | ||||||
|  | 			continue; | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		/* Deal with calls that want immediate attention. */ | 		/* Deal with calls that want immediate attention. */ | ||||||
| 		if ((call = list_first_entry_or_null(&local->call_attend_q, | 		if ((call = list_first_entry_or_null(&local->call_attend_q, | ||||||
| 						     struct rxrpc_call, | 						     struct rxrpc_call, | ||||||
|  | @ -463,6 +478,7 @@ int rxrpc_io_thread(void *data) | ||||||
| 				rxrpc_input_error(local, skb); | 				rxrpc_input_error(local, skb); | ||||||
| 				rxrpc_free_skb(skb, rxrpc_skb_put_error_report); | 				rxrpc_free_skb(skb, rxrpc_skb_put_error_report); | ||||||
| 				break; | 				break; | ||||||
|  | 				break; | ||||||
| 			default: | 			default: | ||||||
| 				WARN_ON_ONCE(1); | 				WARN_ON_ONCE(1); | ||||||
| 				rxrpc_free_skb(skb, rxrpc_skb_put_unknown); | 				rxrpc_free_skb(skb, rxrpc_skb_put_unknown); | ||||||
|  | @ -481,7 +497,8 @@ int rxrpc_io_thread(void *data) | ||||||
| 		set_current_state(TASK_INTERRUPTIBLE); | 		set_current_state(TASK_INTERRUPTIBLE); | ||||||
| 		should_stop = kthread_should_stop(); | 		should_stop = kthread_should_stop(); | ||||||
| 		if (!skb_queue_empty(&local->rx_queue) || | 		if (!skb_queue_empty(&local->rx_queue) || | ||||||
| 		    !list_empty(&local->call_attend_q)) { | 		    !list_empty(&local->call_attend_q) || | ||||||
|  | 		    !list_empty(&local->conn_attend_q)) { | ||||||
| 			__set_current_state(TASK_RUNNING); | 			__set_current_state(TASK_RUNNING); | ||||||
| 			continue; | 			continue; | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | @ -100,6 +100,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net, | ||||||
| 		init_rwsem(&local->defrag_sem); | 		init_rwsem(&local->defrag_sem); | ||||||
| 		init_completion(&local->io_thread_ready); | 		init_completion(&local->io_thread_ready); | ||||||
| 		skb_queue_head_init(&local->rx_queue); | 		skb_queue_head_init(&local->rx_queue); | ||||||
|  | 		INIT_LIST_HEAD(&local->conn_attend_q); | ||||||
| 		INIT_LIST_HEAD(&local->call_attend_q); | 		INIT_LIST_HEAD(&local->call_attend_q); | ||||||
| 		local->client_bundles = RB_ROOT; | 		local->client_bundles = RB_ROOT; | ||||||
| 		spin_lock_init(&local->client_bundles_lock); | 		spin_lock_init(&local->client_bundles_lock); | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue
	
	 David Howells
						David Howells