forked from mirrors/linux
		
	SUNRPC: Improve ordering of transport processing
Since it can take a while before a specific thread gets scheduled, it is better to just implement a first come first served queue mechanism. That way, if a thread is already scheduled and is idle, it can pick up the work to do from the queue. Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com> Signed-off-by: J. Bruce Fields <bfields@redhat.com>
This commit is contained in:
		
							parent
							
								
									95da1b3a5a
								
							
						
					
					
						commit
						22700f3c6d
					
				
					 2 changed files with 31 additions and 70 deletions
				
			
		| 
						 | 
				
			
			@ -46,6 +46,7 @@ struct svc_pool {
 | 
			
		|||
	struct svc_pool_stats	sp_stats;	/* statistics on pool operation */
 | 
			
		||||
#define	SP_TASK_PENDING		(0)		/* still work to do even if no
 | 
			
		||||
						 * xprt is queued. */
 | 
			
		||||
#define SP_CONGESTED		(1)
 | 
			
		||||
	unsigned long		sp_flags;
 | 
			
		||||
} ____cacheline_aligned_in_smp;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -380,7 +380,6 @@ void svc_xprt_do_enqueue(struct svc_xprt *xprt)
 | 
			
		|||
	struct svc_pool *pool;
 | 
			
		||||
	struct svc_rqst	*rqstp = NULL;
 | 
			
		||||
	int cpu;
 | 
			
		||||
	bool queued = false;
 | 
			
		||||
 | 
			
		||||
	if (!svc_xprt_has_something_to_do(xprt))
 | 
			
		||||
		goto out;
 | 
			
		||||
| 
						 | 
				
			
			@ -401,58 +400,25 @@ void svc_xprt_do_enqueue(struct svc_xprt *xprt)
 | 
			
		|||
 | 
			
		||||
	atomic_long_inc(&pool->sp_stats.packets);
 | 
			
		||||
 | 
			
		||||
redo_search:
 | 
			
		||||
	dprintk("svc: transport %p put into queue\n", xprt);
 | 
			
		||||
	spin_lock_bh(&pool->sp_lock);
 | 
			
		||||
	list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
 | 
			
		||||
	pool->sp_stats.sockets_queued++;
 | 
			
		||||
	spin_unlock_bh(&pool->sp_lock);
 | 
			
		||||
 | 
			
		||||
	/* find a thread for this xprt */
 | 
			
		||||
	rcu_read_lock();
 | 
			
		||||
	list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
 | 
			
		||||
		/* Do a lockless check first */
 | 
			
		||||
		if (test_bit(RQ_BUSY, &rqstp->rq_flags))
 | 
			
		||||
		if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags))
 | 
			
		||||
			continue;
 | 
			
		||||
 | 
			
		||||
		/*
 | 
			
		||||
		 * Once the xprt has been queued, it can only be dequeued by
 | 
			
		||||
		 * the task that intends to service it. All we can do at that
 | 
			
		||||
		 * point is to try to wake this thread back up so that it can
 | 
			
		||||
		 * do so.
 | 
			
		||||
		 */
 | 
			
		||||
		if (!queued) {
 | 
			
		||||
			spin_lock_bh(&rqstp->rq_lock);
 | 
			
		||||
			if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) {
 | 
			
		||||
				/* already busy, move on... */
 | 
			
		||||
				spin_unlock_bh(&rqstp->rq_lock);
 | 
			
		||||
				continue;
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			/* this one will do */
 | 
			
		||||
			rqstp->rq_xprt = xprt;
 | 
			
		||||
			svc_xprt_get(xprt);
 | 
			
		||||
			spin_unlock_bh(&rqstp->rq_lock);
 | 
			
		||||
		}
 | 
			
		||||
		rcu_read_unlock();
 | 
			
		||||
 | 
			
		||||
		atomic_long_inc(&pool->sp_stats.threads_woken);
 | 
			
		||||
		wake_up_process(rqstp->rq_task);
 | 
			
		||||
		put_cpu();
 | 
			
		||||
		goto out;
 | 
			
		||||
	}
 | 
			
		||||
	rcu_read_unlock();
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
	 * We didn't find an idle thread to use, so we need to queue the xprt.
 | 
			
		||||
	 * Do so and then search again. If we find one, we can't hook this one
 | 
			
		||||
	 * up to it directly but we can wake the thread up in the hopes that it
 | 
			
		||||
	 * will pick it up once it searches for a xprt to service.
 | 
			
		||||
	 */
 | 
			
		||||
	if (!queued) {
 | 
			
		||||
		queued = true;
 | 
			
		||||
		dprintk("svc: transport %p put into queue\n", xprt);
 | 
			
		||||
		spin_lock_bh(&pool->sp_lock);
 | 
			
		||||
		list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
 | 
			
		||||
		pool->sp_stats.sockets_queued++;
 | 
			
		||||
		spin_unlock_bh(&pool->sp_lock);
 | 
			
		||||
		goto redo_search;
 | 
			
		||||
		goto out_unlock;
 | 
			
		||||
	}
 | 
			
		||||
	set_bit(SP_CONGESTED, &pool->sp_flags);
 | 
			
		||||
	rqstp = NULL;
 | 
			
		||||
out_unlock:
 | 
			
		||||
	rcu_read_unlock();
 | 
			
		||||
	put_cpu();
 | 
			
		||||
out:
 | 
			
		||||
	trace_svc_xprt_do_enqueue(xprt, rqstp);
 | 
			
		||||
| 
						 | 
				
			
			@ -721,38 +687,25 @@ rqst_should_sleep(struct svc_rqst *rqstp)
 | 
			
		|||
 | 
			
		||||
static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
 | 
			
		||||
{
 | 
			
		||||
	struct svc_xprt *xprt;
 | 
			
		||||
	struct svc_pool		*pool = rqstp->rq_pool;
 | 
			
		||||
	long			time_left = 0;
 | 
			
		||||
 | 
			
		||||
	/* rq_xprt should be clear on entry */
 | 
			
		||||
	WARN_ON_ONCE(rqstp->rq_xprt);
 | 
			
		||||
 | 
			
		||||
	/* Normally we will wait up to 5 seconds for any required
 | 
			
		||||
	 * cache information to be provided.
 | 
			
		||||
	 */
 | 
			
		||||
	rqstp->rq_chandle.thread_wait = 5*HZ;
 | 
			
		||||
 | 
			
		||||
	xprt = svc_xprt_dequeue(pool);
 | 
			
		||||
	if (xprt) {
 | 
			
		||||
		rqstp->rq_xprt = xprt;
 | 
			
		||||
 | 
			
		||||
		/* As there is a shortage of threads and this request
 | 
			
		||||
		 * had to be queued, don't allow the thread to wait so
 | 
			
		||||
		 * long for cache updates.
 | 
			
		||||
		 */
 | 
			
		||||
		rqstp->rq_chandle.thread_wait = 1*HZ;
 | 
			
		||||
		clear_bit(SP_TASK_PENDING, &pool->sp_flags);
 | 
			
		||||
		return xprt;
 | 
			
		||||
	}
 | 
			
		||||
	rqstp->rq_xprt = svc_xprt_dequeue(pool);
 | 
			
		||||
	if (rqstp->rq_xprt)
 | 
			
		||||
		goto out_found;
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
	 * We have to be able to interrupt this wait
 | 
			
		||||
	 * to bring down the daemons ...
 | 
			
		||||
	 */
 | 
			
		||||
	set_current_state(TASK_INTERRUPTIBLE);
 | 
			
		||||
	smp_mb__before_atomic();
 | 
			
		||||
	clear_bit(SP_CONGESTED, &pool->sp_flags);
 | 
			
		||||
	clear_bit(RQ_BUSY, &rqstp->rq_flags);
 | 
			
		||||
	smp_mb();
 | 
			
		||||
	smp_mb__after_atomic();
 | 
			
		||||
 | 
			
		||||
	if (likely(rqst_should_sleep(rqstp)))
 | 
			
		||||
		time_left = schedule_timeout(timeout);
 | 
			
		||||
| 
						 | 
				
			
			@ -761,13 +714,11 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
 | 
			
		|||
 | 
			
		||||
	try_to_freeze();
 | 
			
		||||
 | 
			
		||||
	spin_lock_bh(&rqstp->rq_lock);
 | 
			
		||||
	set_bit(RQ_BUSY, &rqstp->rq_flags);
 | 
			
		||||
	spin_unlock_bh(&rqstp->rq_lock);
 | 
			
		||||
 | 
			
		||||
	xprt = rqstp->rq_xprt;
 | 
			
		||||
	if (xprt != NULL)
 | 
			
		||||
		return xprt;
 | 
			
		||||
	smp_mb__after_atomic();
 | 
			
		||||
	rqstp->rq_xprt = svc_xprt_dequeue(pool);
 | 
			
		||||
	if (rqstp->rq_xprt)
 | 
			
		||||
		goto out_found;
 | 
			
		||||
 | 
			
		||||
	if (!time_left)
 | 
			
		||||
		atomic_long_inc(&pool->sp_stats.threads_timedout);
 | 
			
		||||
| 
						 | 
				
			
			@ -775,6 +726,15 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
 | 
			
		|||
	if (signalled() || kthread_should_stop())
 | 
			
		||||
		return ERR_PTR(-EINTR);
 | 
			
		||||
	return ERR_PTR(-EAGAIN);
 | 
			
		||||
out_found:
 | 
			
		||||
	/* Normally we will wait up to 5 seconds for any required
 | 
			
		||||
	 * cache information to be provided.
 | 
			
		||||
	 */
 | 
			
		||||
	if (!test_bit(SP_CONGESTED, &pool->sp_flags))
 | 
			
		||||
		rqstp->rq_chandle.thread_wait = 5*HZ;
 | 
			
		||||
	else
 | 
			
		||||
		rqstp->rq_chandle.thread_wait = 1*HZ;
 | 
			
		||||
	return rqstp->rq_xprt;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in a new issue