Oovcde Index

C++11 Task Queue

Last updated: May 23, 2014

Overview

The Oovcde project needed a task queue where a provider could put work items to work on in a queue, and multiple consumer threads would process the work items. One key point is that the queue should not grow too large as the provider is placing items on the queue, in the case that the consumer's are not processing the queue as quickly as the provider is adding them. In other words, the provider should block if all of the consumer threads are busy.

The starting point for this queue is from this article: Implementing a Thread-Safe Queue using Condition Variables.

Provider Add to Queue

The provider calls the waitPush function. The pseudocode for the waitPush function is:

void OovThreadSafeQueue::waitPush(QueueItem const &workItem)
    {
    std::unique_lock<std::mutex> lock(mProcessQueueMutex);
    // Wait while not empty
    while(!isQueueEmpty())
        {
        // Release lock and wait for signal.
        mConsumerPoppedSignal.wait(lock);
        // After signaled, lock is reaquired.
        }
     push_back(workItem);
    lock.unlock();     // Signal to waitPop that data is ready.     mProviderPushedSignal.notify_one();     }

The mConsumerPoppedSignal and mProviderPushedSignal are declared in the header file like:
    std::condition_variable  mConsumerPoppedSignal;

One key aspect is that the wait does not sleep and periodically wake up. It waits for a signal (either from notify_one or notify_all), and wakes up to add more to the queue when a consumer thread signals that an item is read from the queue. The mConsumerPoppedSignal.wait() function will release the unique_lock, wait for a signal, and then reaquires the lock when the wait function continues.

The waitPop function that is called from the consumer threads works in a similar manner.

Consumer Read from Queue

Here is the pseudocode for the waitPop function.
bool OovThreadSafeQueue::waitPop(QueueItem &workItem)
    {
    std::unique_lock lock(mProcessQueueMutex);
    bool gotItem = false;
    // Wait while empty
    while(isQueueEmpty() && !mQuitPopping)
        {
        // Release lock and wait for signal.
        mProviderPushedSignal.wait(lock);
        // After signaled, lock is reaquired.
        }

    // In the normal case this will not be empty.
    // If it is empty, then there was a signal, but nothing was
    // in the queue. This means that the quit function was called.
    gotItem = !isQueueEmpty();
    if(gotItem)
       { getFront(item); }

    lock.unlock();
    mConsumerPoppedSignal.notify_one();
    return gotItem;
    }
There is an extra piece of code here, and that is the check for mQuitPopping. This allows the consumer to indicate that the consumer threads can quit waiting, which will allow the threads to end.

Provider Wait and Quit

This function can set the mQuitPopping flag, and then wait for the queue to become empty in the same manner as waitPush. The provider can signal all consumer threads to end waiting for a signal. At this point, the queue will be empty, but the consumer threads may still be working on the queued task item. If the threads are joined by the provider, that means all work has been completed. Once the threads are joined, then this means there has been a clean shutdown of all threads.
void OovThreadSafeQueuePrivate::quitPopsPrivate()
    {
    std::unique_lock lock(mProcessQueueMutex);
    mQuitPopping = true;
    // Wait to make sure all queue items were processed.
    while(!isQueueEmpty())
        {
        mConsumerPoppedSignal.wait(lock);
        }
    lock.unlock();
    mProviderPushedSignal.notify_all();
    }
When they wake up, they will check the mQuitPopping flag, and can quit the wait function.

Reducing Code Bloat

In the Oovcde project, the thread safe queue will be instantiated many times. To reduce code bloat, the push and pop functions are moved into a separate non-template class, and then a template class is used to call them. An example of how to do this is described here: Refactoring template bloat

Threads

Once the queue is complete, another class can be written that manages the threads. The threads can be defined as:
   std::vector<std::thread> mWorkerThreads;
Then call push_back with a thread constructor. At the end, then set the flag to quit the pops after all items have been added to the queue, and then call join on all threads. The complete example can be seen in the OovThreadedQueue header and source file in the Oovcde project on both SourceForge and GitHub. The files are in Github here: GitHub/oovcde/OovThreadedQueue.h
GitHub/oovcde/OovThreadedQueue.cpp

Other Factors

The MinGW project does not support some of these features yet. But the MinGW-builds project does, so that the code is portable on both Windows or Linux.
The Oovcde project does not have the cleanest implementation, but is usable and can be used as an example. I would like to get rid of more of the virtual functions, and instead use template functions, but haven't completed it yet. An example is in this article: Use templates instead of virtual functions