A lockless, single-producer/consumer ring-FIFO that won't run out of space

I'm very interested in the lockless single-producer-single-consumer multi-threading FIFO queue posited here, and referenced by a number of implementations: http://msmvps.com/blogs/vandooren/archive/2007/01/05/creating-a-thread-safe-producer-consumer-queue-in-c-without-using-locks.aspx

The idea is to avoid both locks and CAS operations in the multithreading case where you have only a single producer and a single consumer (ie. only 2 threads).

The problem is that the ring buffer can fill, and a produce() operation can fail, requiring the producer to either discard the operation or sit and spin until the consumer catches up.

My idea to solve this is to instead use a doubly linked list of these ring buffers, using std::list. In the unlikely case where a ring buffer gets filled, a new ring buffer (of twice the size) is created, and added to the end of the linked list. After doing so, an "advance" flag is set on the previous ring buffer, which is used to notify the single consumer that once it has depleted its FIFO, it should discard it and move on to the next one.

My understanding of the generic std::list<> implementation is that this should be threadsafe (adding an element to the end of the list does not touch members involved in removing an element from the front of the list, so long as there is at least one member in the list).

Here is my sample code. I'd appreciate any logistical criticism of it. Please bear in mind: this is intended for single-producer/single-consumer ONLY, and I don't need to be told it will break with multiple producers/consumers. :)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
template<class T, size_t DEFAULT_SIZE = 64>
class PCQueue
{
public:
	PCQueue()
	{
		assert(DEFAULT_SIZE & (DEFAULT_SIZE - 1) == 0);  // Only work with even powers-of-two capacities
		m_queues.push_back(QueueInfo());
		m_queues.front().items.resize(DEFAULT_SIZE);
	}
	
	void produce(const T &element)
	{
		QueueInfo	*q = &m_queues.back();
	
		size_t	next_idx = (q->write_idx + 1) & (q->items.size() - 1);
		
		if (next_idx == q->read_idx)
		{
			// Current queue is full; start a new queue that's twice as big
			QueueInfo	*prev_queue = q;
			
			m_queues.push_back(QueueInfo());
			q = &m_queues.back();
			q->items.resize(prev_queue->items.size() << 1);
			next_idx = 1;
			prev_queue->advance = true;
		}
		
		q->items[q->write_idx] = element;
		q->write_idx = next_idx;
	}
	
	T *next_consumed()
	{
		QueueInfo	*q = &m_queues.front();
		
		if (q->read_idx == q->write_idx)
		{
			// The queue is empty
			if (q->advance)
			{
				m_queues.pop_front();
				q = &m_queues.front();
				
				if (q->read_idx == q->write_idx)
				{
					return NULL;
				}
			}
			else
			{
				return NULL;
			}
		}
		
		return &q->items[q->read_idx];
	}
	
	void consume()
	{
		QueueInfo	*q = &m_queues.front();
		
		q->read_idx = (q->read_idx + 1) & (q->items.size() - 1);
	}

private:
	typedef std::vector<T> QueueT;
	
	struct QueueInfo
	{
		QueueInfo() : read_idx(0), write_idx(0), advance(false)
		{
		}
	
		QueueT				items;
		volatile size_t		read_idx;
		volatile size_t		write_idx;
		volatile bool		advance;
	};
	
	typedef std::list<QueueInfo> Queues;
	
private:
	Queues		m_queues;
};



Thanks!

Dan.
Your code is based on false assumptions. It will only work on some versions of MSVS. It will not work on different compilers.

There's complex memory pipelining going on, and simply making a variable volatile does not ensure that writes/reads will occur in the order you expect. You need to have some kind of memory buffer that guarantees that all previous writes have been committed, and all future reads have not yet started.

volatile acts as a memory barrier in some implementations (some MSVS versions), but not all. As such it's probably a bad idea to rely on it.

C++11's std::atomic class offers memory barrier functionality, but it isn't supported by MSVS yet.

Read this for more info on the subject:

http://cbloomrants.blogspot.ca/2009/01/01-25-09-low-level-threading-junk-part.html


EDIT:

This paragraph in particular is why I really recommend reading the whole article:

Almost every single page I've read on this stuff get its wrong. Even by the experts. I always see stuff like this . Where they implement a lock-free fancy doohicky, and then come back later and admit that oh, it doesn't actually work. For example this Julian Bucknall guy has a lot of long articles about lock free stuff, and then every 3rd article he comes back and goes "oh BTW the last article was wrong, oops". BTW never try to use any of the lock free stuff from a place like "codeproject.com" or anybody's random blog.
Last edited on
Topic archived. No new replies allowed.