Thread safe deque implementation

hi l am newbie to c++.l need to implement deque with two threads.one thread will keep on pushing data for every 1 sec using push_back() and other thread will pop the data from deque for every 0.5 sec using pop_front().These threads will run in infinite loop.

can anyone help me to solve this.
Last edited on
You can do this in a basic way by putting a deque inside a class, and controlling access to it with a mutex. Something like this. I haven't tried to compile it; it's just some example code to show how to can create thread safe access around an existing container.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class threadSafeDeque()
{
  public:
  int pop_front()  // pops from the front and gives you back the value that was there
  {
     lock_guard<std::mutex> lock(theMutex);
    int returnVal = theDeque.front();
    theDeque.pop_front();
    return returnVal;
  }
  void push_back(int val)
  {
     lock_guard<std::mutex> lock(theMutex);
     theDeque.push_back(val);
  }
  private:
  deque<int> theDeque;
  mutex theMutex;
};


This can be improved by making it templated (so it works with any type, not just int), and adding checks that the queue isn't empty when you go to fetch from it. I'm sure there are other improvements to make too, but start simple.


Last edited on
Try this:
https://github.com/Konstantin-2/misc/blob/master/tsqueue.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/* Copying and distribution of this file, with or without modification,
 * are permitted in any medium without royalty provided the copyright
 * notice and this notice are preserved.  This file is offered as-is,
 * without any warranty. */

#pragma once
#include <utility>
#include <atomic>
#include <queue>
#include <condition_variable>
#include <optional>
#include <cassert>

/* Thread Safe Queue Template, C++17 */
template <typename T>
struct Tsqueue {
	/* Create Tsqueue object. Set maximum size of the queue to max_size. */
	inline Tsqueue(size_t max_size = -1UL) : maxsize(max_size), end(false) {};

	/* Push T to the queue. Many threads can push at the same time.
	 * If the queue is full, calling thread will be suspended until
	 * some other thread pop() data. */
	void push(const T&);
	void push(T&&);

	/* Close the queue.
	 * Be sure all writing threads done their writes before call this.
	 * Push data to closed queue is forbidden. */
	void close();

	/* Pop and return T from the queue. Many threads can pop at the same time.
	 * If the queue is empty, calling thread will be suspended.
	 * If the queue is empty and closed, nullopt returned. */
	std::optional<T> pop();
private:
	std::queue<T> que;
	std::mutex mtx;
	std::condition_variable cv_empty, cv_full;
	const size_t maxsize;
	std::atomic<bool> end;
};

/* Usage sample:
#include <thread>
#include <chrono>
#include <iostream>
#include "tsqueue.h"
using namespace std;
Tsqueue<int> q;
void foo()
{
	for (int i = 0; i < 4; i++) {
		q.push(i);
		this_thread::sleep_for(chrono::seconds(1));
	}
	q.close();
}
int main()
{
	thread t(foo);
	while(auto x = q.pop())
		cout << *x << '\n';
	t.join();
}
*/

template<typename T>
void Tsqueue<T>::push(T&& t)
{
	std::unique_lock<std::mutex> lck(mtx);
	while (que.size() == maxsize && !end)
		cv_full.wait(lck);
	assert(!end);
	que.push(std::move(t));
	cv_empty.notify_one();
}

template<typename T>
void Tsqueue<T>::push(const T& t)
{
	std::unique_lock<std::mutex> lck(mtx);
	while (que.size() == maxsize && !end)
		cv_full.wait(lck);
	assert(!end);
	que.push(std::move(t));
	cv_empty.notify_one();
}

template<typename T>
std::optional<T> Tsqueue<T>::pop()
{
	std::unique_lock<std::mutex> lck(mtx);
	while (que.empty() && !end)
		cv_empty.wait(lck);
	if (que.empty()) return {};
	T t = std::move(que.front());
	que.pop();
	cv_full.notify_one();
	return t;
}

template<typename T>
void Tsqueue<T>::close()
{
	end = true;
	std::lock_guard<std::mutex> lck(mtx);
	cv_empty.notify_one();
	cv_full.notify_one();
}
Topic archived. No new replies allowed.