mirror of
				https://github.com/torvalds/linux.git
				synced 2025-11-04 10:40:15 +02:00 
			
		
		
		
	rds: use RCU to synchronize work-enqueue with connection teardown
rds_sendmsg() can enqueue work on cp_send_w from process context, but it should not enqueue this work if connection teardown has commenced (else we risk enquing work after rds_conn_path_destroy() has assumed that all work has been cancelled/flushed). Similarly some other functions like rds_cong_queue_updates and rds_tcp_data_ready are called in softirq context, and may end up enqueuing work on rds_wq after rds_conn_path_destroy() has assumed that all workqs are quiesced. Check the RDS_DESTROY_PENDING bit and use rcu synchronization to avoid all these races. Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com> Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com> Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
		
							parent
							
								
									c90ecbfaf5
								
							
						
					
					
						commit
						3db6e0d172
					
				
					 6 changed files with 81 additions and 20 deletions
				
			
		| 
						 | 
					@ -219,7 +219,11 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
 | 
				
			||||||
	spin_lock_irqsave(&rds_cong_lock, flags);
 | 
						spin_lock_irqsave(&rds_cong_lock, flags);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	list_for_each_entry(conn, &map->m_conn_list, c_map_item) {
 | 
						list_for_each_entry(conn, &map->m_conn_list, c_map_item) {
 | 
				
			||||||
		if (!test_and_set_bit(0, &conn->c_map_queued)) {
 | 
							struct rds_conn_path *cp = &conn->c_path[0];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							rcu_read_lock();
 | 
				
			||||||
 | 
							if (!test_and_set_bit(0, &conn->c_map_queued) &&
 | 
				
			||||||
 | 
							    !test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
 | 
				
			||||||
			rds_stats_inc(s_cong_update_queued);
 | 
								rds_stats_inc(s_cong_update_queued);
 | 
				
			||||||
			/* We cannot inline the call to rds_send_xmit() here
 | 
								/* We cannot inline the call to rds_send_xmit() here
 | 
				
			||||||
			 * for two reasons (both pertaining to a TCP transport):
 | 
								 * for two reasons (both pertaining to a TCP transport):
 | 
				
			||||||
| 
						 | 
					@ -235,9 +239,9 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
 | 
				
			||||||
			 *    therefore trigger warnings.
 | 
								 *    therefore trigger warnings.
 | 
				
			||||||
			 * Defer the xmit to rds_send_worker() instead.
 | 
								 * Defer the xmit to rds_send_worker() instead.
 | 
				
			||||||
			 */
 | 
								 */
 | 
				
			||||||
			queue_delayed_work(rds_wq,
 | 
								queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
 | 
				
			||||||
					   &conn->c_path[0].cp_send_w, 0);
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							rcu_read_unlock();
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	spin_unlock_irqrestore(&rds_cong_lock, flags);
 | 
						spin_unlock_irqrestore(&rds_cong_lock, flags);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -366,8 +366,6 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
 | 
				
			||||||
	 * to the conn hash, so we never trigger a reconnect on this
 | 
						 * to the conn hash, so we never trigger a reconnect on this
 | 
				
			||||||
	 * conn - the reconnect is always triggered by the active peer. */
 | 
						 * conn - the reconnect is always triggered by the active peer. */
 | 
				
			||||||
	cancel_delayed_work_sync(&cp->cp_conn_w);
 | 
						cancel_delayed_work_sync(&cp->cp_conn_w);
 | 
				
			||||||
	if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
 | 
					 | 
				
			||||||
		return;
 | 
					 | 
				
			||||||
	rcu_read_lock();
 | 
						rcu_read_lock();
 | 
				
			||||||
	if (!hlist_unhashed(&conn->c_hash_node)) {
 | 
						if (!hlist_unhashed(&conn->c_hash_node)) {
 | 
				
			||||||
		rcu_read_unlock();
 | 
							rcu_read_unlock();
 | 
				
			||||||
| 
						 | 
					@ -390,6 +388,7 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
 | 
				
			||||||
		return;
 | 
							return;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* make sure lingering queued work won't try to ref the conn */
 | 
						/* make sure lingering queued work won't try to ref the conn */
 | 
				
			||||||
 | 
						synchronize_rcu();
 | 
				
			||||||
	cancel_delayed_work_sync(&cp->cp_send_w);
 | 
						cancel_delayed_work_sync(&cp->cp_send_w);
 | 
				
			||||||
	cancel_delayed_work_sync(&cp->cp_recv_w);
 | 
						cancel_delayed_work_sync(&cp->cp_recv_w);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -407,6 +406,11 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
 | 
				
			||||||
	if (cp->cp_xmit_rm)
 | 
						if (cp->cp_xmit_rm)
 | 
				
			||||||
		rds_message_put(cp->cp_xmit_rm);
 | 
							rds_message_put(cp->cp_xmit_rm);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						WARN_ON(delayed_work_pending(&cp->cp_send_w));
 | 
				
			||||||
 | 
						WARN_ON(delayed_work_pending(&cp->cp_recv_w));
 | 
				
			||||||
 | 
						WARN_ON(delayed_work_pending(&cp->cp_conn_w));
 | 
				
			||||||
 | 
						WARN_ON(work_pending(&cp->cp_down_w));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
 | 
						cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -686,10 +690,13 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	atomic_set(&cp->cp_state, RDS_CONN_ERROR);
 | 
						atomic_set(&cp->cp_state, RDS_CONN_ERROR);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
 | 
						rcu_read_lock();
 | 
				
			||||||
 | 
						if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
 | 
				
			||||||
 | 
							rcu_read_unlock();
 | 
				
			||||||
		return;
 | 
							return;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	queue_work(rds_wq, &cp->cp_down_w);
 | 
						queue_work(rds_wq, &cp->cp_down_w);
 | 
				
			||||||
 | 
						rcu_read_unlock();
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
EXPORT_SYMBOL_GPL(rds_conn_path_drop);
 | 
					EXPORT_SYMBOL_GPL(rds_conn_path_drop);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -706,9 +713,15 @@ EXPORT_SYMBOL_GPL(rds_conn_drop);
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
 | 
					void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
						rcu_read_lock();
 | 
				
			||||||
 | 
						if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
 | 
				
			||||||
 | 
							rcu_read_unlock();
 | 
				
			||||||
 | 
							return;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
 | 
						if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
 | 
				
			||||||
	    !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
 | 
						    !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
 | 
				
			||||||
		queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
 | 
							queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
 | 
				
			||||||
 | 
						rcu_read_unlock();
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
 | 
					EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -162,6 +162,12 @@ int rds_send_xmit(struct rds_conn_path *cp)
 | 
				
			||||||
		goto out;
 | 
							goto out;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
 | 
				
			||||||
 | 
							release_in_xmit(cp);
 | 
				
			||||||
 | 
							ret = -ENETUNREACH; /* dont requeue send work */
 | 
				
			||||||
 | 
							goto out;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/*
 | 
						/*
 | 
				
			||||||
	 * we record the send generation after doing the xmit acquire.
 | 
						 * we record the send generation after doing the xmit acquire.
 | 
				
			||||||
	 * if someone else manages to jump in and do some work, we'll use
 | 
						 * if someone else manages to jump in and do some work, we'll use
 | 
				
			||||||
| 
						 | 
					@ -437,7 +443,12 @@ int rds_send_xmit(struct rds_conn_path *cp)
 | 
				
			||||||
		    !list_empty(&cp->cp_send_queue)) && !raced) {
 | 
							    !list_empty(&cp->cp_send_queue)) && !raced) {
 | 
				
			||||||
			if (batch_count < send_batch_count)
 | 
								if (batch_count < send_batch_count)
 | 
				
			||||||
				goto restart;
 | 
									goto restart;
 | 
				
			||||||
 | 
								rcu_read_lock();
 | 
				
			||||||
 | 
								if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
 | 
				
			||||||
 | 
									ret = -ENETUNREACH;
 | 
				
			||||||
 | 
								else
 | 
				
			||||||
				queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
 | 
									queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
 | 
				
			||||||
 | 
								rcu_read_unlock();
 | 
				
			||||||
		} else if (raced) {
 | 
							} else if (raced) {
 | 
				
			||||||
			rds_stats_inc(s_send_lock_queue_raced);
 | 
								rds_stats_inc(s_send_lock_queue_raced);
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
| 
						 | 
					@ -1151,6 +1162,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 | 
				
			||||||
	else
 | 
						else
 | 
				
			||||||
		cpath = &conn->c_path[0];
 | 
							cpath = &conn->c_path[0];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags)) {
 | 
				
			||||||
 | 
							ret = -EAGAIN;
 | 
				
			||||||
 | 
							goto out;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	rds_conn_path_connect_if_down(cpath);
 | 
						rds_conn_path_connect_if_down(cpath);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
 | 
						ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
 | 
				
			||||||
| 
						 | 
					@ -1190,9 +1206,17 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 | 
				
			||||||
	rds_stats_inc(s_send_queued);
 | 
						rds_stats_inc(s_send_queued);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ret = rds_send_xmit(cpath);
 | 
						ret = rds_send_xmit(cpath);
 | 
				
			||||||
	if (ret == -ENOMEM || ret == -EAGAIN)
 | 
						if (ret == -ENOMEM || ret == -EAGAIN) {
 | 
				
			||||||
 | 
							ret = 0;
 | 
				
			||||||
 | 
							rcu_read_lock();
 | 
				
			||||||
 | 
							if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags))
 | 
				
			||||||
 | 
								ret = -ENETUNREACH;
 | 
				
			||||||
 | 
							else
 | 
				
			||||||
			queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
 | 
								queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
 | 
				
			||||||
 | 
							rcu_read_unlock();
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if (ret)
 | 
				
			||||||
 | 
							goto out;
 | 
				
			||||||
	rds_message_put(rm);
 | 
						rds_message_put(rm);
 | 
				
			||||||
	return payload_len;
 | 
						return payload_len;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1270,7 +1294,10 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
 | 
				
			||||||
	rds_stats_inc(s_send_pong);
 | 
						rds_stats_inc(s_send_pong);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* schedule the send work on rds_wq */
 | 
						/* schedule the send work on rds_wq */
 | 
				
			||||||
 | 
						rcu_read_lock();
 | 
				
			||||||
 | 
						if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
 | 
				
			||||||
		queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
 | 
							queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
 | 
				
			||||||
 | 
						rcu_read_unlock();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	rds_message_put(rm);
 | 
						rds_message_put(rm);
 | 
				
			||||||
	return 0;
 | 
						return 0;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -321,8 +321,12 @@ void rds_tcp_data_ready(struct sock *sk)
 | 
				
			||||||
	ready = tc->t_orig_data_ready;
 | 
						ready = tc->t_orig_data_ready;
 | 
				
			||||||
	rds_tcp_stats_inc(s_tcp_data_ready_calls);
 | 
						rds_tcp_stats_inc(s_tcp_data_ready_calls);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM)
 | 
						if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
 | 
				
			||||||
 | 
							rcu_read_lock();
 | 
				
			||||||
 | 
							if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
 | 
				
			||||||
			queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
 | 
								queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
 | 
				
			||||||
 | 
							rcu_read_unlock();
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
out:
 | 
					out:
 | 
				
			||||||
	read_unlock_bh(&sk->sk_callback_lock);
 | 
						read_unlock_bh(&sk->sk_callback_lock);
 | 
				
			||||||
	ready(sk);
 | 
						ready(sk);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -202,8 +202,11 @@ void rds_tcp_write_space(struct sock *sk)
 | 
				
			||||||
	tc->t_last_seen_una = rds_tcp_snd_una(tc);
 | 
						tc->t_last_seen_una = rds_tcp_snd_una(tc);
 | 
				
			||||||
	rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked);
 | 
						rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf)
 | 
						rcu_read_lock();
 | 
				
			||||||
 | 
						if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
 | 
				
			||||||
 | 
						    !test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
 | 
				
			||||||
		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
 | 
							queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
 | 
				
			||||||
 | 
						rcu_read_unlock();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
out:
 | 
					out:
 | 
				
			||||||
	read_unlock_bh(&sk->sk_callback_lock);
 | 
						read_unlock_bh(&sk->sk_callback_lock);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -87,9 +87,13 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	cp->cp_reconnect_jiffies = 0;
 | 
						cp->cp_reconnect_jiffies = 0;
 | 
				
			||||||
	set_bit(0, &cp->cp_conn->c_map_queued);
 | 
						set_bit(0, &cp->cp_conn->c_map_queued);
 | 
				
			||||||
 | 
						rcu_read_lock();
 | 
				
			||||||
 | 
						if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
 | 
				
			||||||
		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
 | 
							queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
 | 
				
			||||||
		queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
 | 
							queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						rcu_read_unlock();
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
EXPORT_SYMBOL_GPL(rds_connect_path_complete);
 | 
					EXPORT_SYMBOL_GPL(rds_connect_path_complete);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void rds_connect_complete(struct rds_connection *conn)
 | 
					void rds_connect_complete(struct rds_connection *conn)
 | 
				
			||||||
| 
						 | 
					@ -133,7 +137,10 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
 | 
				
			||||||
	set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
 | 
						set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
 | 
				
			||||||
	if (cp->cp_reconnect_jiffies == 0) {
 | 
						if (cp->cp_reconnect_jiffies == 0) {
 | 
				
			||||||
		cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
 | 
							cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
 | 
				
			||||||
 | 
							rcu_read_lock();
 | 
				
			||||||
 | 
							if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
 | 
				
			||||||
			queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
 | 
								queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
 | 
				
			||||||
 | 
							rcu_read_unlock();
 | 
				
			||||||
		return;
 | 
							return;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -141,8 +148,11 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
 | 
				
			||||||
	rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
 | 
						rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
 | 
				
			||||||
		 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
 | 
							 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
 | 
				
			||||||
		 conn, &conn->c_laddr, &conn->c_faddr);
 | 
							 conn, &conn->c_laddr, &conn->c_faddr);
 | 
				
			||||||
 | 
						rcu_read_lock();
 | 
				
			||||||
 | 
						if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
 | 
				
			||||||
		queue_delayed_work(rds_wq, &cp->cp_conn_w,
 | 
							queue_delayed_work(rds_wq, &cp->cp_conn_w,
 | 
				
			||||||
				   rand % cp->cp_reconnect_jiffies);
 | 
									   rand % cp->cp_reconnect_jiffies);
 | 
				
			||||||
 | 
						rcu_read_unlock();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
 | 
						cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
 | 
				
			||||||
					rds_sysctl_reconnect_max_jiffies);
 | 
										rds_sysctl_reconnect_max_jiffies);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in a new issue