forked from mirrors/gecko-dev
		
	Bug 1454385 - Add a single producer single consumer lock and wait free queue to mfbt/. r=froydnj
MozReview-Commit-ID: 6Dq0GQtYgv2 --HG-- extra : rebase_source : 67bc4245a61c15738e3a6467a03b41e9e29af9ce
This commit is contained in:
		
							parent
							
								
									3d1005e447
								
							
						
					
					
						commit
						515134e9a0
					
				
					 5 changed files with 683 additions and 0 deletions
				
			
		
							
								
								
									
										428
									
								
								mfbt/SPSCQueue.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										428
									
								
								mfbt/SPSCQueue.h
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,428 @@ | |||
| /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ | ||||
| /* vim: set ts=8 sts=2 et sw=2 tw=80: */ | ||||
| /* This Source Code Form is subject to the terms of the Mozilla Public
 | ||||
|  * License, v. 2.0. If a copy of the MPL was not distributed with this | ||||
|  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 | ||||
| 
 | ||||
| /* Single producer single consumer lock-free and wait-free queue. */ | ||||
| 
 | ||||
| #ifndef mozilla_LockFreeQueue_h | ||||
| #define mozilla_LockFreeQueue_h | ||||
| 
 | ||||
| #include "mozilla/Assertions.h" | ||||
| #include "mozilla/Attributes.h" | ||||
| #include "mozilla/PodOperations.h" | ||||
| #include <algorithm> | ||||
| #include <atomic> | ||||
| #include <cstdint> | ||||
| #include <memory> | ||||
| #include <thread> | ||||
| 
 | ||||
| namespace mozilla { | ||||
| 
 | ||||
| namespace details { | ||||
| template<typename T, bool IsPod = std::is_trivial<T>::value> | ||||
| struct MemoryOperations { | ||||
|   /**
 | ||||
|    * This allows zeroing (using memset) or default-constructing a number of | ||||
|    * elements calling the constructors if necessary. | ||||
|    */ | ||||
|   static void ConstructDefault(T* aDestination, size_t aCount); | ||||
|   /**
 | ||||
|    * This allows either moving (if T supports it) or copying a number of | ||||
|    * elements from a `aSource` pointer to a `aDestination` pointer. | ||||
|    * If it is safe to do so and this call copies, this uses PodCopy. Otherwise, | ||||
|    * constructors and destructors are called in a loop. | ||||
|    */ | ||||
|   static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount); | ||||
| }; | ||||
| 
 | ||||
| template<typename T> | ||||
| struct MemoryOperations<T, true> | ||||
| { | ||||
|   static void ConstructDefault(T* aDestination, size_t aCount) | ||||
|   { | ||||
|     PodZero(aDestination, aCount); | ||||
|   } | ||||
|   static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) | ||||
|   { | ||||
|     PodCopy(aDestination, aSource, aCount); | ||||
|   } | ||||
| }; | ||||
| 
 | ||||
| template<typename T> | ||||
| struct MemoryOperations<T, false> | ||||
| { | ||||
|   static void ConstructDefault(T* aDestination, size_t aCount) | ||||
|   { | ||||
|     for (size_t i = 0; i < aCount; i++) { | ||||
|       aDestination[i] = T(); | ||||
|     } | ||||
|   } | ||||
|   static void MoveOrCopy(T* aDestination, T* aSource, size_t aCount) | ||||
|   { | ||||
|     std::move(aSource, aSource + aCount, aDestination); | ||||
|   } | ||||
| }; | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| /**
 | ||||
|  * This data structure allows producing data from one thread, and consuming it | ||||
|  * on another thread, safely and without explicit synchronization. | ||||
|  * | ||||
|  * The role for the producer and the consumer must be constant, i.e., the | ||||
|  * producer should always be on one thread and the consumer should always be on | ||||
|  * another thread. | ||||
|  * | ||||
|  * Some words about the inner workings of this class: | ||||
|  * - Capacity is fixed. Only one allocation is performed, in the constructor. | ||||
|  *   When reading and writing, the return value of the method allows checking if | ||||
|  *   the ring buffer is empty or full. | ||||
|  * - We always keep the read index at least one element ahead of the write | ||||
|  *   index, so we can distinguish between an empty and a full ring buffer: an | ||||
|  *   empty ring buffer is when the write index is at the same position as the | ||||
|  *   read index. A full buffer is when the write index is exactly one position | ||||
|  *   before the read index. | ||||
|  * - We synchronize updates to the read index after having read the data, and | ||||
|  *   the write index after having written the data. This means that the each | ||||
|  *   thread can only touch a portion of the buffer that is not touched by the | ||||
|  *   other thread. | ||||
|  * - Callers are expected to provide buffers. When writing to the queue, | ||||
|  *   elements are copied into the internal storage from the buffer passed in. | ||||
|  *   When reading from the queue, the user is expected to provide a buffer. | ||||
|  *   Because this is a ring buffer, data might not be contiguous in memory; | ||||
|  *   providing an external buffer to copy into is an easy way to have linear | ||||
|  *   data for further processing. | ||||
|  */ | ||||
| template<typename T> | ||||
| class SPSCRingBufferBase | ||||
| { | ||||
| public: | ||||
|   /**
 | ||||
|    * Constructor for a ring buffer. | ||||
|    * | ||||
|    * This performs an allocation on the heap, but is the only allocation that | ||||
|    * will happen for the life time of a `SPSCRingBufferBase`. | ||||
|    * | ||||
|    * @param Capacity The maximum number of element this ring buffer will hold. | ||||
|    */ | ||||
|   explicit | ||||
|   SPSCRingBufferBase(int aCapacity) | ||||
|     : mReadIndex(0) | ||||
|     , mWriteIndex(0) | ||||
|     /* One more element to distinguish from empty and full buffer. */ | ||||
|     , mCapacity(aCapacity + 1) | ||||
|   { | ||||
|     MOZ_ASSERT(StorageCapacity() < std::numeric_limits<int>::max() / 2, | ||||
|               "buffer too large for the type of index used."); | ||||
|     MOZ_ASSERT(mCapacity > 0 && aCapacity != std::numeric_limits<int>::max()); | ||||
| 
 | ||||
|     mData = std::make_unique<T[]>(StorageCapacity()); | ||||
| 
 | ||||
|     std::atomic_thread_fence(std::memory_order::memory_order_seq_cst); | ||||
|   } | ||||
|   /**
 | ||||
|    * Push `aCount` zero or default constructed elements in the array. | ||||
|    * | ||||
|    * Only safely called on the producer thread. | ||||
|    * | ||||
|    * @param count The number of elements to enqueue. | ||||
|    * @return The number of element enqueued. | ||||
|    */ | ||||
|   MOZ_MUST_USE | ||||
|   int EnqueueDefault(int aCount) { | ||||
|     return Enqueue(nullptr, aCount); | ||||
|   } | ||||
|   /**
 | ||||
|    * @brief Put an element in the queue. | ||||
|    * | ||||
|    * Only safely called on the producer thread. | ||||
|    * | ||||
|    * @param element The element to put in the queue. | ||||
|    * | ||||
|    * @return 1 if the element was inserted, 0 otherwise. | ||||
|    */ | ||||
|   MOZ_MUST_USE | ||||
|   int Enqueue(T& aElement) { | ||||
|     return Enqueue(&aElement, 1); | ||||
|   } | ||||
|   /**
 | ||||
|    * Push `aCount` elements in the ring buffer. | ||||
|    * | ||||
|    * Only safely called on the producer thread. | ||||
|    * | ||||
|    * @param elements a pointer to a buffer containing at least `count` elements. | ||||
|    * If `elements` is nullptr, zero or default constructed elements are enqueud. | ||||
|    * @param count The number of elements to read from `elements` | ||||
|    * @return The number of elements successfully coped from `elements` and | ||||
|    * inserted into the ring buffer. | ||||
|    */ | ||||
|   MOZ_MUST_USE | ||||
|   int Enqueue(T* aElements, int aCount) | ||||
|   { | ||||
| #ifdef DEBUG | ||||
|     AssertCorrectThread(mProducerId); | ||||
| #endif | ||||
| 
 | ||||
|     int rdIdx = mReadIndex.load(std::memory_order::memory_order_acquire); | ||||
|     int wrIdx = mWriteIndex.load(std::memory_order::memory_order_relaxed); | ||||
| 
 | ||||
|     if (IsFull(rdIdx, wrIdx)) { | ||||
|       return 0; | ||||
|     } | ||||
| 
 | ||||
|     int toWrite = std::min(AvailableWriteInternal(rdIdx, wrIdx), aCount); | ||||
| 
 | ||||
|     /* First part, from the write index to the end of the array. */ | ||||
|     int firstPart = std::min(StorageCapacity() - wrIdx, toWrite); | ||||
|     /* Second part, from the beginning of the array */ | ||||
|     int secondPart = toWrite - firstPart; | ||||
| 
 | ||||
|     if (aElements) { | ||||
|       details::MemoryOperations<T>::MoveOrCopy(mData.get() + wrIdx, aElements, firstPart); | ||||
|       details::MemoryOperations<T>::MoveOrCopy(mData.get(), aElements + firstPart, secondPart); | ||||
|     } else { | ||||
|       details::MemoryOperations<T>::ConstructDefault(mData.get() + wrIdx, firstPart); | ||||
|       details::MemoryOperations<T>::ConstructDefault(mData.get(), secondPart); | ||||
|     } | ||||
| 
 | ||||
|     mWriteIndex.store(IncrementIndex(wrIdx, toWrite), | ||||
|                       std::memory_order::memory_order_release); | ||||
| 
 | ||||
|     return toWrite; | ||||
|   } | ||||
|   /**
 | ||||
|    * Retrieve at most `count` elements from the ring buffer, and copy them to | ||||
|    * `elements`, if non-null. | ||||
|    * | ||||
|    * Only safely called on the consumer side. | ||||
|    * | ||||
|    * @param elements A pointer to a buffer with space for at least `count` | ||||
|    * elements. If `elements` is `nullptr`, `count` element will be discarded. | ||||
|    * @param count The maximum number of elements to Dequeue. | ||||
|    * @return The number of elements written to `elements`. | ||||
|    */ | ||||
|   MOZ_MUST_USE | ||||
|   int Dequeue(T* elements, int count) | ||||
|   { | ||||
| #ifdef DEBUG | ||||
|     AssertCorrectThread(mConsumerId); | ||||
| #endif | ||||
| 
 | ||||
|     int wrIdx = mWriteIndex.load(std::memory_order::memory_order_acquire); | ||||
|     int rdIdx = mReadIndex.load(std::memory_order::memory_order_relaxed); | ||||
| 
 | ||||
|     if (IsEmpty(rdIdx, wrIdx)) { | ||||
|       return 0; | ||||
|     } | ||||
| 
 | ||||
|     int toRead = std::min(AvailableReadInternal(rdIdx, wrIdx), count); | ||||
| 
 | ||||
|     int firstPart = std::min(StorageCapacity() - rdIdx, toRead); | ||||
|     int secondPart = toRead - firstPart; | ||||
| 
 | ||||
|     if (elements) { | ||||
|       details::MemoryOperations<T>::MoveOrCopy(elements, mData.get() + rdIdx, firstPart); | ||||
|       details::MemoryOperations<T>::MoveOrCopy(elements + firstPart, mData.get(), secondPart); | ||||
|     } | ||||
| 
 | ||||
|     mReadIndex.store(IncrementIndex(rdIdx, toRead), | ||||
|                      std::memory_order::memory_order_release); | ||||
| 
 | ||||
|     return toRead; | ||||
|   } | ||||
|   /**
 | ||||
|    * Get the number of available elements for consuming. | ||||
|    * | ||||
|    * Only safely called on the consumer thread. This can be less than the actual | ||||
|    * number of elements in the queue, since the mWriteIndex is updated at the | ||||
|    * very end of the Enqueue method on the producer thread, but consequently | ||||
|    * always returns a number of elements such that a call to Dequeue return this | ||||
|    * number of elements. | ||||
|    * | ||||
|    * @return The number of available elements for reading. | ||||
|    */ | ||||
|   int AvailableRead() const | ||||
|   { | ||||
| #ifdef DEBUG | ||||
|     AssertCorrectThread(mConsumerId); | ||||
| #endif | ||||
|     return AvailableReadInternal( | ||||
|       mReadIndex.load(std::memory_order::memory_order_relaxed), | ||||
|       mWriteIndex.load(std::memory_order::memory_order_relaxed)); | ||||
|   } | ||||
|   /**
 | ||||
|    * Get the number of available elements for writing. | ||||
|    * | ||||
|    * Only safely called on the producer thread. This can be less than than the | ||||
|    * actual number of slots that are available, because mReadIndex is update at | ||||
|    * the very end of the Deque method. It always returns a number such that a | ||||
|    * call to Enqueue with this number will succeed in enqueuing this number of | ||||
|    * elements. | ||||
|    * | ||||
|    * @return The number of empty slots in the buffer, available for writing. | ||||
|    */ | ||||
|   int AvailableWrite() const | ||||
|   { | ||||
| #ifdef DEBUG | ||||
|     AssertCorrectThread(mProducerId); | ||||
| #endif | ||||
|     return AvailableWriteInternal( | ||||
|       mReadIndex.load(std::memory_order::memory_order_relaxed), | ||||
|       mWriteIndex.load(std::memory_order::memory_order_relaxed)); | ||||
|   } | ||||
|   /**
 | ||||
|    * Get the total Capacity, for this ring buffer. | ||||
|    * | ||||
|    * Can be called safely on any thread. | ||||
|    * | ||||
|    * @return The maximum Capacity of this ring buffer. | ||||
|    */ | ||||
|   int Capacity() const { return StorageCapacity() - 1; } | ||||
|   /**
 | ||||
|    * Reset the consumer and producer thread identifier, in case the threads are | ||||
|    * being changed. This has to be externally synchronized. This is no-op when | ||||
|    * asserts are disabled. | ||||
|    */ | ||||
|   void ResetThreadIds() | ||||
|   { | ||||
| #ifdef DEBUG | ||||
|     mConsumerId = mProducerId = std::thread::id(); | ||||
| #endif | ||||
|   } | ||||
| private: | ||||
|   /** Return true if the ring buffer is empty.
 | ||||
|    * | ||||
|    * This can be called from the consumer or the producer thread. | ||||
|    * | ||||
|    * @param aReadIndex the read index to consider | ||||
|    * @param writeIndex the write index to consider | ||||
|    * @return true if the ring buffer is empty, false otherwise. | ||||
|    **/ | ||||
|   bool IsEmpty(int aReadIndex, int aWriteIndex) const | ||||
|   { | ||||
|     return aWriteIndex == aReadIndex; | ||||
|   } | ||||
|   /** Return true if the ring buffer is full.
 | ||||
|    * | ||||
|    * This happens if the write index is exactly one element behind the read | ||||
|    * index. | ||||
|    * | ||||
|    * This can be called from the consummer or the producer thread. | ||||
|    * | ||||
|    * @param aReadIndex the read index to consider | ||||
|    * @param writeIndex the write index to consider | ||||
|    * @return true if the ring buffer is full, false otherwise. | ||||
|    **/ | ||||
|   bool IsFull(int aReadIndex, int aWriteIndex) const | ||||
|   { | ||||
|     return (aWriteIndex + 1) % StorageCapacity() == aReadIndex; | ||||
|   } | ||||
|   /**
 | ||||
|    * Return the size of the storage. It is one more than the number of elements | ||||
|    * that can be stored in the buffer. | ||||
|    * | ||||
|    * This can be called from any thread. | ||||
|    * | ||||
|    * @return the number of elements that can be stored in the buffer. | ||||
|    */ | ||||
|   int StorageCapacity() const { return mCapacity; } | ||||
|   /**
 | ||||
|    * Returns the number of elements available for reading. | ||||
|    * | ||||
|    * This can be called from the consummer or producer thread, but see the | ||||
|    * comment in `AvailableRead`. | ||||
|    * | ||||
|    * @return the number of available elements for reading. | ||||
|    */ | ||||
|   int AvailableReadInternal(int aReadIndex, int aWriteIndex) const | ||||
|   { | ||||
|     if (aWriteIndex >= aReadIndex) { | ||||
|       return aWriteIndex - aReadIndex; | ||||
|     } else { | ||||
|       return aWriteIndex + StorageCapacity() - aReadIndex; | ||||
|     } | ||||
|   } | ||||
|   /**
 | ||||
|    * Returns the number of empty elements, available for writing. | ||||
|    * | ||||
|    * This can be called from the consummer or producer thread, but see the | ||||
|    * comment in `AvailableWrite`. | ||||
|    * | ||||
|    * @return the number of elements that can be written into the array. | ||||
|    */ | ||||
|   int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const | ||||
|   { | ||||
|     /* We subtract one element here to always keep at least one sample
 | ||||
|      * free in the buffer, to distinguish between full and empty array. */ | ||||
|     int rv = aReadIndex - aWriteIndex - 1; | ||||
|     if (aWriteIndex >= aReadIndex) { | ||||
|       rv += StorageCapacity(); | ||||
|     } | ||||
|     return rv; | ||||
|   } | ||||
|   /**
 | ||||
|    * Increments an index, wrapping it around the storage. | ||||
|    * | ||||
|    * Incrementing `mWriteIndex` can be done on the producer thread. | ||||
|    * Incrementing `mReadIndex` can be done on the consummer thread. | ||||
|    * | ||||
|    * @param index a reference to the index to increment. | ||||
|    * @param increment the number by which `index` is incremented. | ||||
|    * @return the new index. | ||||
|    */ | ||||
|   int IncrementIndex(int aIndex, int aIncrement) const | ||||
|   { | ||||
|     MOZ_ASSERT(aIncrement >= 0 && | ||||
|                aIncrement < StorageCapacity() && | ||||
|                aIndex < StorageCapacity()); | ||||
|     return (aIndex + aIncrement) % StorageCapacity(); | ||||
|   } | ||||
|   /**
 | ||||
|    * @brief This allows checking that Enqueue (resp. Dequeue) are always | ||||
|    * called by the right thread. | ||||
|    * | ||||
|    * The role of the thread are assigned the first time they call Enqueue or | ||||
|    * Dequeue, and cannot change, except when ResetThreadIds is called.. | ||||
|    * | ||||
|    * @param id the id of the thread that has called the calling method first. | ||||
|    */ | ||||
| #ifdef DEBUG | ||||
|   static void AssertCorrectThread(std::thread::id& aId) | ||||
|   { | ||||
|     if (aId == std::thread::id()) { | ||||
|       aId = std::this_thread::get_id(); | ||||
|       return; | ||||
|     } | ||||
|     MOZ_ASSERT(aId == std::this_thread::get_id()); | ||||
|   } | ||||
| #endif | ||||
|   /** Index at which the oldest element is. */ | ||||
|   std::atomic<int> mReadIndex; | ||||
|   /** Index at which to write new elements. `mWriteIndex` is always at
 | ||||
|    * least one element ahead of `mReadIndex`. */ | ||||
|   std::atomic<int> mWriteIndex; | ||||
|   /** Maximum number of elements that can be stored in the ring buffer. */ | ||||
|   const int mCapacity; | ||||
|   /** Data storage, of size `mCapacity + 1` */ | ||||
|   std::unique_ptr<T[]> mData; | ||||
| #ifdef DEBUG | ||||
|   /** The id of the only thread that is allowed to read from the queue. */ | ||||
|   mutable std::thread::id mConsumerId; | ||||
|   /** The id of the only thread that is allowed to write from the queue. */ | ||||
|   mutable std::thread::id mProducerId; | ||||
| #endif | ||||
| }; | ||||
| 
 | ||||
| /**
 | ||||
|  * Instantiation of the `SPSCRingBufferBase` type. This is safe to use | ||||
|  * from two threads, one producer, one consumer (that never change role), | ||||
|  * without explicit synchronization. | ||||
|  */ | ||||
| template<typename T> | ||||
| using SPSCQueue = SPSCRingBufferBase<T>; | ||||
| 
 | ||||
| } // namespace mozilla
 | ||||
| 
 | ||||
| #endif // mozilla_LockFreeQueue_h
 | ||||
|  | @ -88,6 +88,7 @@ EXPORTS.mozilla = [ | |||
|     'Span.h', | ||||
|     'SplayTree.h', | ||||
|     'Sprintf.h', | ||||
|     'SPSCQueue.h', | ||||
|     'StaticAnalysisFunctions.h', | ||||
|     'TaggedAnonymousMemory.h', | ||||
|     'TemplateLib.h', | ||||
|  |  | |||
							
								
								
									
										251
									
								
								mfbt/tests/TestSPSCQueue.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										251
									
								
								mfbt/tests/TestSPSCQueue.cpp
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,251 @@ | |||
| /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ | ||||
| /* vim: set ts=8 sts=2 et sw=2 tw=80: */ | ||||
| /* This Source Code Form is subject to the terms of the Mozilla Public
 | ||||
|  * License, v. 2.0. If a copy of the MPL was not distributed with this | ||||
|  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 | ||||
| 
 | ||||
| #include "mozilla/SPSCQueue.h" | ||||
| #include "mozilla/PodOperations.h" | ||||
| #include <vector> | ||||
| #include <iostream> | ||||
| #include <thread> | ||||
| #include <chrono> | ||||
| #include <memory> | ||||
| #include <string> | ||||
| 
 | ||||
| using namespace mozilla; | ||||
| 
 | ||||
| /* Generate a monotonically increasing sequence of numbers. */ | ||||
| template<typename T> | ||||
| class SequenceGenerator | ||||
| { | ||||
| public: | ||||
|   SequenceGenerator() | ||||
|   { } | ||||
|   void Get(T * aElements, size_t aCount) | ||||
|   { | ||||
|     for (size_t i = 0; i < aCount; i++) { | ||||
|       aElements[i] = static_cast<T>(mIndex); | ||||
|       mIndex++; | ||||
|     } | ||||
|   } | ||||
|   void Rewind(size_t aCount) | ||||
|   { | ||||
|     mIndex -= aCount; | ||||
|   } | ||||
| private: | ||||
|   size_t mIndex = 0; | ||||
| }; | ||||
| 
 | ||||
| /* Checks that a sequence is monotonically increasing. */ | ||||
| template<typename T> | ||||
| class SequenceVerifier | ||||
| { | ||||
| public: | ||||
|   SequenceVerifier() | ||||
|   { } | ||||
|   void Check(T * aElements, size_t aCount) | ||||
|   { | ||||
|     for (size_t i = 0; i < aCount; i++) { | ||||
|       if (aElements[i] != static_cast<T>(mIndex)) { | ||||
|         std::cerr << "Element " << i << " is different. Expected " | ||||
|           << static_cast<T>(mIndex) << ", got " << aElements[i] | ||||
|           << "." << std::endl; | ||||
|         MOZ_RELEASE_ASSERT(false); | ||||
|       } | ||||
|       mIndex++; | ||||
|     } | ||||
|   } | ||||
| private: | ||||
|   size_t mIndex = 0; | ||||
| }; | ||||
| 
 | ||||
| const int BLOCK_SIZE = 127; | ||||
| 
 | ||||
| template<typename T> | ||||
| void TestRing(int capacity) | ||||
| { | ||||
|   SPSCQueue<T> buf(capacity); | ||||
|   std::unique_ptr<T[]> seq(new T[capacity]); | ||||
|   SequenceGenerator<T> gen; | ||||
|   SequenceVerifier<T> checker; | ||||
| 
 | ||||
|   int iterations = 1002; | ||||
| 
 | ||||
|   while(iterations--) { | ||||
|     gen.Get(seq.get(), BLOCK_SIZE); | ||||
|     int rv = buf.Enqueue(seq.get(), BLOCK_SIZE); | ||||
|     MOZ_RELEASE_ASSERT(rv == BLOCK_SIZE); | ||||
|     PodZero(seq.get(), BLOCK_SIZE); | ||||
|     rv = buf.Dequeue(seq.get(), BLOCK_SIZE); | ||||
|     MOZ_RELEASE_ASSERT(rv == BLOCK_SIZE); | ||||
|     checker.Check(seq.get(), BLOCK_SIZE); | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| template<typename T> | ||||
| void TestRingMultiThread(int capacity) | ||||
| { | ||||
|   SPSCQueue<T> buf(capacity); | ||||
|   SequenceVerifier<T> checker; | ||||
|   std::unique_ptr<T[]> outBuffer(new T[capacity]); | ||||
| 
 | ||||
|   std::thread t([&buf, capacity] { | ||||
|     int iterations = 1002; | ||||
|     std::unique_ptr<T[]> inBuffer(new T[capacity]); | ||||
|     SequenceGenerator<T> gen; | ||||
| 
 | ||||
|     while(iterations--) { | ||||
|       std::this_thread::sleep_for(std::chrono::microseconds(10)); | ||||
|       gen.Get(inBuffer.get(), BLOCK_SIZE); | ||||
|       int rv = buf.Enqueue(inBuffer.get(), BLOCK_SIZE); | ||||
|       MOZ_RELEASE_ASSERT(rv <= BLOCK_SIZE); | ||||
|       if (rv != BLOCK_SIZE) { | ||||
|         gen.Rewind(BLOCK_SIZE - rv); | ||||
|       } | ||||
|     } | ||||
|   }); | ||||
| 
 | ||||
|   int remaining = 1002; | ||||
| 
 | ||||
|   while(remaining--) { | ||||
|     std::this_thread::sleep_for(std::chrono::microseconds(10)); | ||||
|     int rv = buf.Dequeue(outBuffer.get(), BLOCK_SIZE); | ||||
|     MOZ_RELEASE_ASSERT(rv <= BLOCK_SIZE); | ||||
|     checker.Check(outBuffer.get(), rv); | ||||
|   } | ||||
| 
 | ||||
|   t.join(); | ||||
| } | ||||
| 
 | ||||
| template<typename T> | ||||
| void BasicAPITest(T& ring) | ||||
| { | ||||
|   MOZ_RELEASE_ASSERT(ring.Capacity() == 128); | ||||
| 
 | ||||
|   MOZ_RELEASE_ASSERT(ring.AvailableRead() == 0); | ||||
|   MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 128); | ||||
| 
 | ||||
|   int rv = ring.EnqueueDefault(63); | ||||
| 
 | ||||
|   MOZ_RELEASE_ASSERT(rv == 63); | ||||
|   MOZ_RELEASE_ASSERT(ring.AvailableRead() == 63); | ||||
|   MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 65); | ||||
| 
 | ||||
|   rv = ring.EnqueueDefault(65); | ||||
| 
 | ||||
|   MOZ_RELEASE_ASSERT(rv == 65); | ||||
|   MOZ_RELEASE_ASSERT(ring.AvailableRead() == 128); | ||||
|   MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 0); | ||||
| 
 | ||||
|   rv = ring.Dequeue(nullptr, 63); | ||||
| 
 | ||||
|   MOZ_RELEASE_ASSERT(ring.AvailableRead() == 65); | ||||
|   MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 63); | ||||
| 
 | ||||
|   rv = ring.Dequeue(nullptr, 65); | ||||
| 
 | ||||
|   MOZ_RELEASE_ASSERT(ring.AvailableRead() == 0); | ||||
|   MOZ_RELEASE_ASSERT(ring.AvailableWrite() == 128); | ||||
| } | ||||
| 
 | ||||
| const size_t RING_BUFFER_SIZE = 128; | ||||
| const size_t ENQUEUE_SIZE = RING_BUFFER_SIZE / 2; | ||||
| 
 | ||||
| void TestResetAPI() { | ||||
|   SPSCQueue<float> ring(RING_BUFFER_SIZE); | ||||
|   std::thread t([&ring] { | ||||
|     std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]); | ||||
|     int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE); | ||||
|     MOZ_RELEASE_ASSERT(rv > 0); | ||||
|   }); | ||||
| 
 | ||||
|   t.join(); | ||||
| 
 | ||||
|   ring.ResetThreadIds(); | ||||
| 
 | ||||
|   // Enqueue with a different thread. We have reset the thread ID
 | ||||
|   // in the ring buffer, this should work.
 | ||||
|   std::thread t2([&ring] { | ||||
|     std::unique_ptr<float[]> inBuffer(new float[ENQUEUE_SIZE]); | ||||
|     int rv = ring.Enqueue(inBuffer.get(), ENQUEUE_SIZE); | ||||
|     MOZ_RELEASE_ASSERT(rv > 0); | ||||
|   }); | ||||
| 
 | ||||
|   t2.join(); | ||||
| } | ||||
| 
 | ||||
| void | ||||
| TestMove() | ||||
| { | ||||
|   const size_t ELEMENT_COUNT = 16; | ||||
|   struct Thing { | ||||
|     Thing() | ||||
|       : mStr("") | ||||
|     { } | ||||
|     explicit | ||||
|     Thing(const std::string& aStr) | ||||
|       :mStr(aStr) | ||||
|     { } | ||||
|     Thing(Thing&& aOtherThing) | ||||
|     { | ||||
|       mStr = std::move(aOtherThing.mStr); | ||||
|       // aOtherThing.mStr.clear();
 | ||||
|     } | ||||
|     Thing& operator=(Thing&& aOtherThing) | ||||
|     { | ||||
|       mStr = std::move(aOtherThing.mStr); | ||||
|       return *this; | ||||
|     } | ||||
|     std::string mStr; | ||||
|   }; | ||||
| 
 | ||||
|   std::vector<Thing> vec_in; | ||||
|   std::vector<Thing> vec_out; | ||||
| 
 | ||||
|   for (uint32_t i = 0; i < ELEMENT_COUNT; i++) { | ||||
|     vec_in.push_back(Thing(std::to_string(i))); | ||||
|     vec_out.push_back(Thing()); | ||||
|   } | ||||
| 
 | ||||
|   SPSCQueue<Thing> queue(ELEMENT_COUNT); | ||||
| 
 | ||||
|   int rv = queue.Enqueue(&vec_in[0], ELEMENT_COUNT); | ||||
|   MOZ_RELEASE_ASSERT(rv == ELEMENT_COUNT); | ||||
| 
 | ||||
|   // Check that we've moved the std::string into the queue.
 | ||||
|   for (uint32_t i = 0; i < ELEMENT_COUNT; i++) { | ||||
|     MOZ_RELEASE_ASSERT(vec_in[i].mStr.empty()); | ||||
|   } | ||||
| 
 | ||||
|   rv = queue.Dequeue(&vec_out[0], ELEMENT_COUNT); | ||||
|   MOZ_RELEASE_ASSERT(rv == ELEMENT_COUNT); | ||||
| 
 | ||||
|   for (uint32_t i = 0; i < ELEMENT_COUNT; i++) { | ||||
|     MOZ_RELEASE_ASSERT(std::stoul(vec_out[i].mStr) == i); | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| int main() | ||||
| { | ||||
|   const int minCapacity = 199; | ||||
|   const int maxCapacity = 1277; | ||||
|   const int capacityIncrement = 27; | ||||
| 
 | ||||
|   SPSCQueue<float> q1(128); | ||||
|   BasicAPITest(q1); | ||||
|   SPSCQueue<char> q2(128); | ||||
|   BasicAPITest(q2); | ||||
| 
 | ||||
|   for (uint32_t i = minCapacity; i < maxCapacity; i+=capacityIncrement) { | ||||
|     TestRing<uint32_t>(i); | ||||
|     TestRingMultiThread<uint32_t>(i); | ||||
|     TestRing<float>(i); | ||||
|     TestRingMultiThread<float>(i); | ||||
|   } | ||||
| 
 | ||||
|   TestResetAPI(); | ||||
|   TestMove(); | ||||
| 
 | ||||
|   return 0; | ||||
| } | ||||
|  | @ -51,6 +51,7 @@ CppUnitTests([ | |||
|     'TestSHA1', | ||||
|     'TestSmallPointerArray', | ||||
|     'TestSplayTree', | ||||
|     'TestSPSCQueue', | ||||
|     'TestTemplateLib', | ||||
|     'TestTextUtils', | ||||
|     'TestThreadSafeWeakPtr', | ||||
|  |  | |||
|  | @ -42,6 +42,8 @@ skip-if = os == 'android' # Bug 1147630 | |||
| [TestSmallPointerArray] | ||||
| [TestSaturate] | ||||
| [TestSplayTree] | ||||
| [TestSPSCQueue] | ||||
| skip-if = os == 'linux' # Bug 1464084 | ||||
| [TestSyncRunnable] | ||||
| [TestTemplateLib] | ||||
| [TestTuple] | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue
	
	 Paul Adenot
						Paul Adenot