forked from mirrors/linux
		
	rxrpc: Implement a mechanism to send an event notification to a connection
Provide a means by which an event notification can be sent to a connection through such that the I/O thread can pick it up and handle it rather than doing it in a separate workqueue. This is then used to move the deferred final ACK of a call into the I/O thread rather than a separate work queue as part of the drive to do all transmission from the I/O thread. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
This commit is contained in:
		
							parent
							
								
									03fc55adf8
								
							
						
					
					
						commit
						f2cce89a07
					
				
					 6 changed files with 55 additions and 9 deletions
				
			
		|  | @ -111,7 +111,7 @@ | |||
| 	EM(rxrpc_conn_get_call_input,		"GET inp-call") \ | ||||
| 	EM(rxrpc_conn_get_conn_input,		"GET inp-conn") \ | ||||
| 	EM(rxrpc_conn_get_idle,			"GET idle    ") \ | ||||
| 	EM(rxrpc_conn_get_poke,			"GET poke    ") \ | ||||
| 	EM(rxrpc_conn_get_poke_timer,		"GET poke    ") \ | ||||
| 	EM(rxrpc_conn_get_service_conn,		"GET svc-conn") \ | ||||
| 	EM(rxrpc_conn_new_client,		"NEW client  ") \ | ||||
| 	EM(rxrpc_conn_new_service,		"NEW service ") \ | ||||
|  | @ -126,10 +126,9 @@ | |||
| 	EM(rxrpc_conn_put_service_reaped,	"PUT svc-reap") \ | ||||
| 	EM(rxrpc_conn_put_unbundle,		"PUT unbundle") \ | ||||
| 	EM(rxrpc_conn_put_unidle,		"PUT unidle  ") \ | ||||
| 	EM(rxrpc_conn_put_work,			"PUT work    ") \ | ||||
| 	EM(rxrpc_conn_queue_challenge,		"QUE chall   ") \ | ||||
| 	EM(rxrpc_conn_queue_retry_work,		"QUE retry-wk") \ | ||||
| 	EM(rxrpc_conn_queue_rx_work,		"QUE rx-work ") \ | ||||
| 	EM(rxrpc_conn_queue_timer,		"QUE timer   ") \ | ||||
| 	EM(rxrpc_conn_see_new_service_conn,	"SEE new-svc ") \ | ||||
| 	EM(rxrpc_conn_see_reap_service,		"SEE reap-svc") \ | ||||
| 	E_(rxrpc_conn_see_work,			"SEE work    ") | ||||
|  |  | |||
|  | @ -202,6 +202,7 @@ struct rxrpc_host_header { | |||
|  * - max 48 bytes (struct sk_buff::cb) | ||||
|  */ | ||||
| struct rxrpc_skb_priv { | ||||
| 	struct rxrpc_connection *conn;	/* Connection referred to (poke packet) */ | ||||
| 	u16		offset;		/* Offset of data */ | ||||
| 	u16		len;		/* Length of data */ | ||||
| 	u8		flags; | ||||
|  | @ -292,6 +293,7 @@ struct rxrpc_local { | |||
| 	struct rxrpc_sock __rcu	*service;	/* Service(s) listening on this endpoint */ | ||||
| 	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */ | ||||
| 	struct sk_buff_head	rx_queue;	/* Received packets */ | ||||
| 	struct list_head	conn_attend_q;	/* Conns requiring immediate attention */ | ||||
| 	struct list_head	call_attend_q;	/* Calls requiring immediate attention */ | ||||
| 	struct rb_root		client_bundles;	/* Client connection bundles by socket params */ | ||||
| 	spinlock_t		client_bundles_lock; /* Lock for client_bundles */ | ||||
|  | @ -441,6 +443,7 @@ struct rxrpc_connection { | |||
| 	struct rxrpc_peer	*peer;		/* Remote endpoint */ | ||||
| 	struct rxrpc_net	*rxnet;		/* Network namespace to which call belongs */ | ||||
| 	struct key		*key;		/* Security details */ | ||||
| 	struct list_head	attend_link;	/* Link in local->conn_attend_q */ | ||||
| 
 | ||||
| 	refcount_t		ref; | ||||
| 	atomic_t		active;		/* Active count for service conns */ | ||||
|  | @ -905,6 +908,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, struct sk_buff *s | |||
| void rxrpc_process_connection(struct work_struct *); | ||||
| void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool); | ||||
| int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb); | ||||
| void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb); | ||||
| 
 | ||||
| /*
 | ||||
|  * conn_object.c | ||||
|  | @ -912,6 +916,7 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb); | |||
| extern unsigned int rxrpc_connection_expiry; | ||||
| extern unsigned int rxrpc_closed_conn_expiry; | ||||
| 
 | ||||
| void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why); | ||||
| struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t); | ||||
| struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *, | ||||
| 							  struct sockaddr_rxrpc *, | ||||
|  |  | |||
|  | @ -412,10 +412,6 @@ static void rxrpc_do_process_connection(struct rxrpc_connection *conn) | |||
| 	if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events)) | ||||
| 		rxrpc_secure_connection(conn); | ||||
| 
 | ||||
| 	/* Process delayed ACKs whose time has come. */ | ||||
| 	if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK) | ||||
| 		rxrpc_process_delayed_final_acks(conn, false); | ||||
| 
 | ||||
| 	/* go through the conn-level event packets, releasing the ref on this
 | ||||
| 	 * connection that each one has when we've finished with it */ | ||||
| 	while ((skb = skb_dequeue(&conn->rx_queue))) { | ||||
|  | @ -515,3 +511,13 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb) | |||
| 		return -EPROTO; | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| /*
 | ||||
|  * Input a connection event. | ||||
|  */ | ||||
| void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb) | ||||
| { | ||||
| 	/* Process delayed ACKs whose time has come. */ | ||||
| 	if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK) | ||||
| 		rxrpc_process_delayed_final_acks(conn, false); | ||||
| } | ||||
|  |  | |||
|  | @ -23,12 +23,30 @@ static void rxrpc_clean_up_connection(struct work_struct *work); | |||
| static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet, | ||||
| 					 unsigned long reap_at); | ||||
| 
 | ||||
| void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why) | ||||
| { | ||||
| 	struct rxrpc_local *local = conn->local; | ||||
| 	bool busy; | ||||
| 
 | ||||
| 	if (WARN_ON_ONCE(!local)) | ||||
| 		return; | ||||
| 
 | ||||
| 	spin_lock_bh(&local->lock); | ||||
| 	busy = !list_empty(&conn->attend_link); | ||||
| 	if (!busy) { | ||||
| 		rxrpc_get_connection(conn, why); | ||||
| 		list_add_tail(&conn->attend_link, &local->conn_attend_q); | ||||
| 	} | ||||
| 	spin_unlock_bh(&local->lock); | ||||
| 	rxrpc_wake_up_io_thread(local); | ||||
| } | ||||
| 
 | ||||
| static void rxrpc_connection_timer(struct timer_list *timer) | ||||
| { | ||||
| 	struct rxrpc_connection *conn = | ||||
| 		container_of(timer, struct rxrpc_connection, timer); | ||||
| 
 | ||||
| 	rxrpc_queue_conn(conn, rxrpc_conn_queue_timer); | ||||
| 	rxrpc_poke_conn(conn, rxrpc_conn_get_poke_timer); | ||||
| } | ||||
| 
 | ||||
| /*
 | ||||
|  |  | |||
|  | @ -421,6 +421,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, | |||
|  */ | ||||
| int rxrpc_io_thread(void *data) | ||||
| { | ||||
| 	struct rxrpc_connection *conn; | ||||
| 	struct sk_buff_head rx_queue; | ||||
| 	struct rxrpc_local *local = data; | ||||
| 	struct rxrpc_call *call; | ||||
|  | @ -436,6 +437,20 @@ int rxrpc_io_thread(void *data) | |||
| 	for (;;) { | ||||
| 		rxrpc_inc_stat(local->rxnet, stat_io_loop); | ||||
| 
 | ||||
| 		/* Deal with connections that want immediate attention. */ | ||||
| 		conn = list_first_entry_or_null(&local->conn_attend_q, | ||||
| 						struct rxrpc_connection, | ||||
| 						attend_link); | ||||
| 		if (conn) { | ||||
| 			spin_lock_bh(&local->lock); | ||||
| 			list_del_init(&conn->attend_link); | ||||
| 			spin_unlock_bh(&local->lock); | ||||
| 
 | ||||
| 			rxrpc_input_conn_event(conn, NULL); | ||||
| 			rxrpc_put_connection(conn, rxrpc_conn_put_poke); | ||||
| 			continue; | ||||
| 		} | ||||
| 
 | ||||
| 		/* Deal with calls that want immediate attention. */ | ||||
| 		if ((call = list_first_entry_or_null(&local->call_attend_q, | ||||
| 						     struct rxrpc_call, | ||||
|  | @ -463,6 +478,7 @@ int rxrpc_io_thread(void *data) | |||
| 				rxrpc_input_error(local, skb); | ||||
| 				rxrpc_free_skb(skb, rxrpc_skb_put_error_report); | ||||
| 				break; | ||||
| 				break; | ||||
| 			default: | ||||
| 				WARN_ON_ONCE(1); | ||||
| 				rxrpc_free_skb(skb, rxrpc_skb_put_unknown); | ||||
|  | @ -481,7 +497,8 @@ int rxrpc_io_thread(void *data) | |||
| 		set_current_state(TASK_INTERRUPTIBLE); | ||||
| 		should_stop = kthread_should_stop(); | ||||
| 		if (!skb_queue_empty(&local->rx_queue) || | ||||
| 		    !list_empty(&local->call_attend_q)) { | ||||
| 		    !list_empty(&local->call_attend_q) || | ||||
| 		    !list_empty(&local->conn_attend_q)) { | ||||
| 			__set_current_state(TASK_RUNNING); | ||||
| 			continue; | ||||
| 		} | ||||
|  |  | |||
|  | @ -100,6 +100,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net, | |||
| 		init_rwsem(&local->defrag_sem); | ||||
| 		init_completion(&local->io_thread_ready); | ||||
| 		skb_queue_head_init(&local->rx_queue); | ||||
| 		INIT_LIST_HEAD(&local->conn_attend_q); | ||||
| 		INIT_LIST_HEAD(&local->call_attend_q); | ||||
| 		local->client_bundles = RB_ROOT; | ||||
| 		spin_lock_init(&local->client_bundles_lock); | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue
	
	 David Howells
						David Howells