Oovcde Index
C++11 Task Queue
Last updated: May 23, 2014
Overview
The Oovcde project needed a task queue where a provider could put work items to
work on in a queue, and multiple consumer threads would process the work items.
One key point is that the queue should not grow too large as the provider is
placing items on the queue, in the case that the consumer's are not processing
the queue as quickly as the provider is adding them.
In other words, the provider should block if all of the consumer threads are busy.
The starting point for this queue is from this article:
Implementing a Thread-Safe Queue using Condition Variables.
Provider Add to Queue
The provider calls the waitPush function. The pseudocode for the waitPush function is:
void OovThreadSafeQueue::waitPush(QueueItem const &workItem)
{
std::unique_lock<std::mutex> lock(mProcessQueueMutex);
// Wait while not empty
while(!isQueueEmpty())
{
// Release lock and wait for signal.
mConsumerPoppedSignal.wait(lock);
// After signaled, lock is reaquired.
}
push_back(workItem);
lock.unlock();
// Signal to waitPop that data is ready.
mProviderPushedSignal.notify_one();
}
The mConsumerPoppedSignal and mProviderPushedSignal are declared in the header file like:
std::condition_variable mConsumerPoppedSignal;
One key aspect is that the wait does not sleep and periodically wake up. It waits
for a signal (either from notify_one or notify_all), and wakes up to add more to the
queue when a consumer thread signals
that an item is read from the queue. The mConsumerPoppedSignal.wait() function will
release the unique_lock, wait for a signal, and then reaquires the lock when the
wait function continues.
The waitPop function that is called from the consumer threads works in a similar manner.
Consumer Read from Queue
Here is the pseudocode for the waitPop function.
bool OovThreadSafeQueue::waitPop(QueueItem &workItem)
{
std::unique_lock lock(mProcessQueueMutex);
bool gotItem = false;
// Wait while empty
while(isQueueEmpty() && !mQuitPopping)
{
// Release lock and wait for signal.
mProviderPushedSignal.wait(lock);
// After signaled, lock is reaquired.
}
// In the normal case this will not be empty.
// If it is empty, then there was a signal, but nothing was
// in the queue. This means that the quit function was called.
gotItem = !isQueueEmpty();
if(gotItem)
{ getFront(item); }
lock.unlock();
mConsumerPoppedSignal.notify_one();
return gotItem;
}
There is an extra piece of code here, and that is the check for mQuitPopping.
This allows the consumer to indicate that the consumer threads can quit waiting,
which will allow the threads to end.
Provider Wait and Quit
This function can set the mQuitPopping flag, and then wait for the queue to become
empty in the same manner as waitPush.
The provider can signal all consumer threads to end waiting for a signal.
At this point, the queue will be empty, but the consumer threads may still
be working on the queued task item.
If the threads are joined by the provider, that means all work has been
completed. Once the threads are joined, then this means there has been a clean
shutdown of all threads.
void OovThreadSafeQueuePrivate::quitPopsPrivate()
{
std::unique_lock lock(mProcessQueueMutex);
mQuitPopping = true;
// Wait to make sure all queue items were processed.
while(!isQueueEmpty())
{
mConsumerPoppedSignal.wait(lock);
}
lock.unlock();
mProviderPushedSignal.notify_all();
}
When they wake up, they will check the mQuitPopping flag, and can quit the wait
function.
Reducing Code Bloat
In the Oovcde project, the thread safe queue will be instantiated many times.
To reduce code bloat, the push and pop functions are moved into a separate
non-template class, and then a template class is used to call them.
An example of how to do this is described here:
Refactoring template bloat
Threads
Once the queue is complete, another class can be written that manages the
threads. The threads can be defined as:
std::vector<std::thread> mWorkerThreads;
Then call push_back with a thread constructor. At the end, then set the
flag to quit the pops after all items have been added to the queue, and
then call join on all threads. The complete example can be seen in the
OovThreadedQueue header and source file in the Oovcde project on both SourceForge
and GitHub. The files are in Github here:
GitHub/oovcde/OovThreadedQueue.h
GitHub/oovcde/OovThreadedQueue.cpp
Other Factors
The MinGW project does not support some of these features yet. But the MinGW-builds
project does, so that the code is portable on both Windows or Linux.
The Oovcde project does not have the cleanest implementation, but is usable and
can be used as an example. I would like to get rid of more of the virtual functions,
and instead use template functions, but haven't completed it yet.
An example is in this article:
Use templates instead of virtual functions