I'm very interested in the lockless single-producer-single-consumer multi-threading FIFO queue posited here, and referenced by a number of implementations:
http://msmvps.com/blogs/vandooren/archive/2007/01/05/creating-a-thread-safe-producer-consumer-queue-in-c-without-using-locks.aspx
The idea is to avoid both locks and CAS operations in the multithreading case where you have only a single producer and a single consumer (ie. only 2 threads).
The problem is that the ring buffer can fill, and a produce() operation can fail, requiring the producer to either discard the operation or sit and spin until the consumer catches up.
My idea to solve this is to instead use a doubly linked list of these ring buffers, using std::list. In the unlikely case where a ring buffer gets filled, a new ring buffer (of twice the size) is created, and added to the end of the linked list. After doing so, an "advance" flag is set on the previous ring buffer, which is used to notify the single consumer that once it has depleted its FIFO, it should discard it and move on to the next one.
My understanding of the generic std::list<> implementation is that this should be threadsafe (adding an element to the end of the list does not touch members involved in removing an element from the front of the list, so long as there is at least one member in the list).
Here is my sample code. I'd appreciate any logistical criticism of it. Please bear in mind:
this is intended for single-producer/single-consumer ONLY, and I don't need to be told it will break with multiple producers/consumers. :)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
template<class T, size_t DEFAULT_SIZE = 64>
class PCQueue
{
public:
PCQueue()
{
assert(DEFAULT_SIZE & (DEFAULT_SIZE - 1) == 0); // Only work with even powers-of-two capacities
m_queues.push_back(QueueInfo());
m_queues.front().items.resize(DEFAULT_SIZE);
}
void produce(const T &element)
{
QueueInfo *q = &m_queues.back();
size_t next_idx = (q->write_idx + 1) & (q->items.size() - 1);
if (next_idx == q->read_idx)
{
// Current queue is full; start a new queue that's twice as big
QueueInfo *prev_queue = q;
m_queues.push_back(QueueInfo());
q = &m_queues.back();
q->items.resize(prev_queue->items.size() << 1);
next_idx = 1;
prev_queue->advance = true;
}
q->items[q->write_idx] = element;
q->write_idx = next_idx;
}
T *next_consumed()
{
QueueInfo *q = &m_queues.front();
if (q->read_idx == q->write_idx)
{
// The queue is empty
if (q->advance)
{
m_queues.pop_front();
q = &m_queues.front();
if (q->read_idx == q->write_idx)
{
return NULL;
}
}
else
{
return NULL;
}
}
return &q->items[q->read_idx];
}
void consume()
{
QueueInfo *q = &m_queues.front();
q->read_idx = (q->read_idx + 1) & (q->items.size() - 1);
}
private:
typedef std::vector<T> QueueT;
struct QueueInfo
{
QueueInfo() : read_idx(0), write_idx(0), advance(false)
{
}
QueueT items;
volatile size_t read_idx;
volatile size_t write_idx;
volatile bool advance;
};
typedef std::list<QueueInfo> Queues;
private:
Queues m_queues;
};
|
Thanks!
Dan.