forked from mirrors/gecko-dev
		
	 3f2298c2f6
			
		
	
	
		3f2298c2f6
		
	
	
	
	
		
			
			MozReview-Commit-ID: 666LNaHyiuQ --HG-- extra : rebase_source : f3a24e0342d8dcdf9020dd473bc6b1045cbc533b
		
			
				
	
	
		
			467 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			467 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
 | |
| /* vim:set ts=2 sw=2 sts=2 et cindent: */
 | |
| /* This Source Code Form is subject to the terms of the Mozilla Public
 | |
|  * License, v. 2.0. If a copy of the MPL was not distributed with this
 | |
|  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 | |
| 
 | |
| #include "EventTokenBucket.h"
 | |
| 
 | |
| #include "nsICancelable.h"
 | |
| #include "nsIIOService.h"
 | |
| #include "nsNetCID.h"
 | |
| #include "nsNetUtil.h"
 | |
| #include "nsServiceManagerUtils.h"
 | |
| #include "nsSocketTransportService2.h"
 | |
| #ifdef DEBUG
 | |
| #include "MainThreadUtils.h"
 | |
| #endif
 | |
| 
 | |
| #ifdef XP_WIN
 | |
| #include <windows.h>
 | |
| #include <mmsystem.h>
 | |
| #endif
 | |
| 
 | |
| namespace mozilla {
 | |
| namespace net {
 | |
| 
 | |
| ////////////////////////////////////////////
 | |
| // EventTokenBucketCancelable
 | |
| ////////////////////////////////////////////
 | |
| 
 | |
| class TokenBucketCancelable : public nsICancelable
 | |
| {
 | |
| public:
 | |
|   NS_DECL_THREADSAFE_ISUPPORTS
 | |
|   NS_DECL_NSICANCELABLE
 | |
| 
 | |
|   explicit TokenBucketCancelable(class ATokenBucketEvent *event);
 | |
|   void Fire();
 | |
| 
 | |
| private:
 | |
|   virtual ~TokenBucketCancelable() = default;
 | |
| 
 | |
|   friend class EventTokenBucket;
 | |
|   ATokenBucketEvent *mEvent;
 | |
| };
 | |
| 
 | |
| NS_IMPL_ISUPPORTS(TokenBucketCancelable, nsICancelable)
 | |
| 
 | |
| TokenBucketCancelable::TokenBucketCancelable(ATokenBucketEvent *event)
 | |
|   : mEvent(event)
 | |
| {
 | |
| }
 | |
| 
 | |
| NS_IMETHODIMP
 | |
| TokenBucketCancelable::Cancel(nsresult reason)
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
|   mEvent = nullptr;
 | |
|   return NS_OK;
 | |
| }
 | |
| 
 | |
| void
 | |
| TokenBucketCancelable::Fire()
 | |
| {
 | |
|   if (!mEvent)
 | |
|     return;
 | |
| 
 | |
|   ATokenBucketEvent *event = mEvent;
 | |
|   mEvent = nullptr;
 | |
|   event->OnTokenBucketAdmitted();
 | |
| }
 | |
| 
 | |
| ////////////////////////////////////////////
 | |
| // EventTokenBucket
 | |
| ////////////////////////////////////////////
 | |
| 
 | |
| NS_IMPL_ISUPPORTS(EventTokenBucket, nsITimerCallback, nsINamed)
 | |
| 
 | |
| // by default 1hz with no burst
 | |
| EventTokenBucket::EventTokenBucket(uint32_t eventsPerSecond,
 | |
|                                    uint32_t burstSize)
 | |
|   : mUnitCost(kUsecPerSec)
 | |
|   , mMaxCredit(kUsecPerSec)
 | |
|   , mCredit(kUsecPerSec)
 | |
|   , mPaused(false)
 | |
|   , mStopped(false)
 | |
|   , mTimerArmed(false)
 | |
| #ifdef XP_WIN
 | |
|   , mFineGrainTimerInUse(false)
 | |
|   , mFineGrainResetTimerArmed(false)
 | |
| #endif
 | |
| {
 | |
|   mLastUpdate = TimeStamp::Now();
 | |
| 
 | |
|   MOZ_ASSERT(NS_IsMainThread());
 | |
| 
 | |
|   nsresult rv;
 | |
|   nsCOMPtr<nsIEventTarget> sts;
 | |
|   nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
 | |
|   if (NS_SUCCEEDED(rv))
 | |
|     sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
 | |
|   if (NS_SUCCEEDED(rv))
 | |
|     mTimer = NS_NewTimer(sts);
 | |
|   SetRate(eventsPerSecond, burstSize);
 | |
| }
 | |
| 
 | |
| EventTokenBucket::~EventTokenBucket()
 | |
| {
 | |
|   SOCKET_LOG(("EventTokenBucket::dtor %p events=%zu\n",
 | |
|               this, mEvents.GetSize()));
 | |
| 
 | |
|   CleanupTimers();
 | |
| 
 | |
|   // Complete any queued events to prevent hangs
 | |
|   while (mEvents.GetSize()) {
 | |
|     RefPtr<TokenBucketCancelable> cancelable =
 | |
|       dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
 | |
|     cancelable->Fire();
 | |
|   }
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::CleanupTimers()
 | |
| {
 | |
|   if (mTimer && mTimerArmed) {
 | |
|     mTimer->Cancel();
 | |
|   }
 | |
|   mTimer = nullptr;
 | |
|   mTimerArmed = false;
 | |
| 
 | |
| #ifdef XP_WIN
 | |
|   NormalTimers();
 | |
|   if (mFineGrainResetTimer && mFineGrainResetTimerArmed) {
 | |
|     mFineGrainResetTimer->Cancel();
 | |
|   }
 | |
|   mFineGrainResetTimer = nullptr;
 | |
|   mFineGrainResetTimerArmed = false;
 | |
| #endif
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::SetRate(uint32_t eventsPerSecond,
 | |
|                           uint32_t burstSize)
 | |
| {
 | |
|   SOCKET_LOG(("EventTokenBucket::SetRate %p %u %u\n",
 | |
|               this, eventsPerSecond, burstSize));
 | |
| 
 | |
|   if (eventsPerSecond > kMaxHz) {
 | |
|     eventsPerSecond = kMaxHz;
 | |
|     SOCKET_LOG(("  eventsPerSecond out of range\n"));
 | |
|   }
 | |
| 
 | |
|   if (!eventsPerSecond) {
 | |
|     eventsPerSecond = 1;
 | |
|     SOCKET_LOG(("  eventsPerSecond out of range\n"));
 | |
|   }
 | |
| 
 | |
|   mUnitCost = kUsecPerSec / eventsPerSecond;
 | |
|   mMaxCredit = mUnitCost * burstSize;
 | |
|   if (mMaxCredit > kUsecPerSec * 60 * 15) {
 | |
|     SOCKET_LOG(("  burstSize out of range\n"));
 | |
|     mMaxCredit = kUsecPerSec * 60 * 15;
 | |
|   }
 | |
|   mCredit = mMaxCredit;
 | |
|   mLastUpdate = TimeStamp::Now();
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::ClearCredits()
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
|   SOCKET_LOG(("EventTokenBucket::ClearCredits %p\n", this));
 | |
|   mCredit = 0;
 | |
| }
 | |
| 
 | |
| uint32_t
 | |
| EventTokenBucket::BurstEventsAvailable()
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
|   return static_cast<uint32_t>(mCredit / mUnitCost);
 | |
| }
 | |
| 
 | |
| uint32_t
 | |
| EventTokenBucket::QueuedEvents()
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
|   return mEvents.GetSize();
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::Pause()
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
|   SOCKET_LOG(("EventTokenBucket::Pause %p\n", this));
 | |
|   if (mPaused || mStopped)
 | |
|     return;
 | |
| 
 | |
|   mPaused = true;
 | |
|   if (mTimerArmed) {
 | |
|     mTimer->Cancel();
 | |
|     mTimerArmed = false;
 | |
|   }
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::UnPause()
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
|   SOCKET_LOG(("EventTokenBucket::UnPause %p\n", this));
 | |
|   if (!mPaused || mStopped)
 | |
|     return;
 | |
| 
 | |
|   mPaused = false;
 | |
|   DispatchEvents();
 | |
|   UpdateTimer();
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::Stop()
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
|   SOCKET_LOG(("EventTokenBucket::Stop %p armed=%d\n", this, mTimerArmed));
 | |
|   mStopped = true;
 | |
|   CleanupTimers();
 | |
| 
 | |
|   // Complete any queued events to prevent hangs
 | |
|   while (mEvents.GetSize()) {
 | |
|     RefPtr<TokenBucketCancelable> cancelable =
 | |
|       dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
 | |
|     cancelable->Fire();
 | |
|   }
 | |
| }
 | |
| 
 | |
| nsresult
 | |
| EventTokenBucket::SubmitEvent(ATokenBucketEvent *event, nsICancelable **cancelable)
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
|   SOCKET_LOG(("EventTokenBucket::SubmitEvent %p\n", this));
 | |
| 
 | |
|   if (mStopped || !mTimer)
 | |
|     return NS_ERROR_FAILURE;
 | |
| 
 | |
|   UpdateCredits();
 | |
| 
 | |
|   RefPtr<TokenBucketCancelable> cancelEvent = new TokenBucketCancelable(event);
 | |
|   // When this function exits the cancelEvent needs 2 references, one for the
 | |
|   // mEvents queue and one for the caller of SubmitEvent()
 | |
| 
 | |
|   NS_ADDREF(*cancelable = cancelEvent.get());
 | |
| 
 | |
|   if (mPaused || !TryImmediateDispatch(cancelEvent.get())) {
 | |
|     // queue it
 | |
|     SOCKET_LOG(("   queued\n"));
 | |
|     mEvents.Push(cancelEvent.forget().take());
 | |
|     UpdateTimer();
 | |
|   }
 | |
|   else {
 | |
|     SOCKET_LOG(("   dispatched synchronously\n"));
 | |
|   }
 | |
| 
 | |
|   return NS_OK;
 | |
| }
 | |
| 
 | |
| bool
 | |
| EventTokenBucket::TryImmediateDispatch(TokenBucketCancelable *cancelable)
 | |
| {
 | |
|   if (mCredit < mUnitCost)
 | |
|     return false;
 | |
| 
 | |
|   mCredit -= mUnitCost;
 | |
|   cancelable->Fire();
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::DispatchEvents()
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
|   SOCKET_LOG(("EventTokenBucket::DispatchEvents %p %d\n", this, mPaused));
 | |
|   if (mPaused || mStopped)
 | |
|     return;
 | |
| 
 | |
|   while (mEvents.GetSize() && mUnitCost <= mCredit) {
 | |
|     RefPtr<TokenBucketCancelable> cancelable =
 | |
|       dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
 | |
|     if (cancelable->mEvent) {
 | |
|       SOCKET_LOG(("EventTokenBucket::DispachEvents [%p] "
 | |
|                   "Dispatching queue token bucket event cost=%" PRIu64 " credit=%" PRIu64 "\n",
 | |
|                   this, mUnitCost, mCredit));
 | |
|       mCredit -= mUnitCost;
 | |
|       cancelable->Fire();
 | |
|     }
 | |
|   }
 | |
| 
 | |
| #ifdef XP_WIN
 | |
|   if (!mEvents.GetSize())
 | |
|     WantNormalTimers();
 | |
| #endif
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::UpdateTimer()
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
|   if (mTimerArmed || mPaused || mStopped || !mEvents.GetSize() || !mTimer)
 | |
|     return;
 | |
| 
 | |
|   if (mCredit >= mUnitCost)
 | |
|     return;
 | |
| 
 | |
|   // determine the time needed to wait to accumulate enough credits to admit
 | |
|   // one more event and set the timer for that point. Always round it
 | |
|   // up because firing early doesn't help.
 | |
|   //
 | |
|   uint64_t deficit = mUnitCost - mCredit;
 | |
|   uint64_t msecWait = (deficit + (kUsecPerMsec - 1)) / kUsecPerMsec;
 | |
| 
 | |
|   if (msecWait < 4) // minimum wait
 | |
|     msecWait = 4;
 | |
|   else if (msecWait > 60000) // maximum wait
 | |
|     msecWait = 60000;
 | |
| 
 | |
| #ifdef XP_WIN
 | |
|   FineGrainTimers();
 | |
| #endif
 | |
| 
 | |
|   SOCKET_LOG(("EventTokenBucket::UpdateTimer %p for %" PRIu64 "ms\n",
 | |
|               this, msecWait));
 | |
|   nsresult rv = mTimer->InitWithCallback(this, static_cast<uint32_t>(msecWait),
 | |
|                                          nsITimer::TYPE_ONE_SHOT);
 | |
|   mTimerArmed = NS_SUCCEEDED(rv);
 | |
| }
 | |
| 
 | |
| NS_IMETHODIMP
 | |
| EventTokenBucket::Notify(nsITimer *timer)
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
| 
 | |
| #ifdef XP_WIN
 | |
|   if (timer == mFineGrainResetTimer) {
 | |
|     FineGrainResetTimerNotify();
 | |
|     return NS_OK;
 | |
|   }
 | |
| #endif
 | |
| 
 | |
|   SOCKET_LOG(("EventTokenBucket::Notify() %p\n", this));
 | |
|   mTimerArmed = false;
 | |
|   if (mStopped)
 | |
|     return NS_OK;
 | |
| 
 | |
|   UpdateCredits();
 | |
|   DispatchEvents();
 | |
|   UpdateTimer();
 | |
| 
 | |
|   return NS_OK;
 | |
| }
 | |
| 
 | |
| NS_IMETHODIMP
 | |
| EventTokenBucket::GetName(nsACString& aName)
 | |
| {
 | |
|   aName.AssignLiteral("EventTokenBucket");
 | |
|   return NS_OK;
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::UpdateCredits()
 | |
| {
 | |
|   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
 | |
| 
 | |
|   TimeStamp now = TimeStamp::Now();
 | |
|   TimeDuration elapsed = now - mLastUpdate;
 | |
|   mLastUpdate = now;
 | |
| 
 | |
|   mCredit += static_cast<uint64_t>(elapsed.ToMicroseconds());
 | |
|   if (mCredit > mMaxCredit)
 | |
|     mCredit = mMaxCredit;
 | |
|   SOCKET_LOG(("EventTokenBucket::UpdateCredits %p to %" PRIu64 " (%" PRIu64 " each.. %3.2f)\n",
 | |
|               this, mCredit, mUnitCost, (double)mCredit / mUnitCost));
 | |
| }
 | |
| 
 | |
| #ifdef XP_WIN
 | |
| void
 | |
| EventTokenBucket::FineGrainTimers()
 | |
| {
 | |
|   SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p mFineGrainTimerInUse=%d\n",
 | |
|               this, mFineGrainTimerInUse));
 | |
| 
 | |
|   mLastFineGrainTimerUse = TimeStamp::Now();
 | |
| 
 | |
|   if (mFineGrainTimerInUse)
 | |
|     return;
 | |
| 
 | |
|   if (mUnitCost > kCostFineGrainThreshold)
 | |
|     return;
 | |
| 
 | |
|   SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p timeBeginPeriod()\n",
 | |
|               this));
 | |
| 
 | |
|   mFineGrainTimerInUse = true;
 | |
|   timeBeginPeriod(1);
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::NormalTimers()
 | |
| {
 | |
|   if (!mFineGrainTimerInUse)
 | |
|     return;
 | |
|   mFineGrainTimerInUse = false;
 | |
| 
 | |
|   SOCKET_LOG(("EventTokenBucket::NormalTimers %p timeEndPeriod()\n", this));
 | |
|   timeEndPeriod(1);
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::WantNormalTimers()
 | |
| {
 | |
|     if (!mFineGrainTimerInUse)
 | |
|       return;
 | |
|     if (mFineGrainResetTimerArmed)
 | |
|       return;
 | |
| 
 | |
|     TimeDuration elapsed(TimeStamp::Now() - mLastFineGrainTimerUse);
 | |
|     static const TimeDuration fiveSeconds = TimeDuration::FromSeconds(5);
 | |
| 
 | |
|     if (elapsed >= fiveSeconds) {
 | |
|       NormalTimers();
 | |
|       return;
 | |
|     }
 | |
| 
 | |
|     if (!mFineGrainResetTimer)
 | |
|       mFineGrainResetTimer = NS_NewTimer();
 | |
| 
 | |
|     // if we can't delay the reset, just do it now
 | |
|     if (!mFineGrainResetTimer) {
 | |
|       NormalTimers();
 | |
|       return;
 | |
|     }
 | |
| 
 | |
|     // pad the callback out 100ms to avoid having to round trip this again if the
 | |
|     // timer calls back just a tad early.
 | |
|     SOCKET_LOG(("EventTokenBucket::WantNormalTimers %p "
 | |
|                 "Will reset timer granularity after delay", this));
 | |
| 
 | |
|     mFineGrainResetTimer->InitWithCallback(
 | |
|       this,
 | |
|       static_cast<uint32_t>((fiveSeconds - elapsed).ToMilliseconds()) + 100,
 | |
|       nsITimer::TYPE_ONE_SHOT);
 | |
|     mFineGrainResetTimerArmed = true;
 | |
| }
 | |
| 
 | |
| void
 | |
| EventTokenBucket::FineGrainResetTimerNotify()
 | |
| {
 | |
|   SOCKET_LOG(("EventTokenBucket::FineGrainResetTimerNotify() events = %d\n",
 | |
|               this, mEvents.GetSize()));
 | |
|   mFineGrainResetTimerArmed = false;
 | |
| 
 | |
|   // If we are currently processing events then wait for the queue to drain
 | |
|   // before trying to reset back to normal timers again
 | |
|   if (!mEvents.GetSize())
 | |
|     WantNormalTimers();
 | |
| }
 | |
| 
 | |
| #endif
 | |
| 
 | |
| } // namespace net
 | |
| } // namespace mozilla
 |