semaphore mutex and a producer/consumer problem

my program is supposed to accept 4 numbers via initial arguments while running the code, ie %executablefile num1 num2 num3 num4
the code will then take the num3 and num4 to loop a thread creation
then sleep for time num1

each producer/consumer will then take num2 as how long they will thread safe sleep for before starting as to give other threads a chance to get in.

the issue i am having is that i cannot seem to be able to tell if the consumer has caught up with the producer and vice versa in a circular wait buffer.

i cant see where my logic falls apart though, any help is appreciated





1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#include <cstdlib>
#include <pthread.h>
#include <thread>
#include <chrono>
#include <iostream>
#include <mutex>
#include <semaphore.h>
#include <unistd.h>
#include <sstream>
#define BUFFER_SIZE 5


typedef int buffer_item;
bool quit = false;
pthread_mutex_t mutex;
sem_t buf;
int sem_value;
int producer_position = 0, consumer_position = 0;
//  Initialize buffer
int buffer_array[BUFFER_SIZE] = {-1,-1,-1,-1,-1};
//std::cout << sem_value << std::endl; //prints number of semaphores left




bool buffer_insert_item( buffer_item item )
{
	buffer_array[producer_position % BUFFER_SIZE] = item;
	std::cout << item << " at position " << producer_position << std::endl;
	producer_position = (producer_position + 1) % BUFFER_SIZE;
};

bool buffer_remove_item()
{
	if(buffer_array[consumer_position % BUFFER_SIZE] % 2 == 1)
	{
		//is prime 
		std::cout << buffer_array[consumer_position % BUFFER_SIZE] <<" is a prime number at position " << consumer_position << std::endl;
	}
	else
		std::cout << buffer_array[consumer_position % BUFFER_SIZE] <<" at position " << consumer_position << std::endl;
	consumer_position = (consumer_position + 1) % BUFFER_SIZE;
};





void* producer(void *argv)
{
	int holder = atoi((char *)argv);
	std::cout << "producer start"	<< std::endl;
	unsigned int seed;
	seed = time(NULL);
	//this is for each thread outputting both random and unique numbers
	std::stringstream X;
	X << std::this_thread::get_id();
	int id = std::stoull(X.str());
	std::cout << id << std::endl;
	while(quit != true)
	{
		//seeding random
		//adding the id to the number generated makes it random even to parallel threads
		buffer_item random_num = std::abs(((rand_r(&seed) + id) % 100)+1);
		holder = (rand_r(&seed) % holder) + 1;
		std::this_thread::sleep_for (std::chrono::milliseconds(holder));
		sem_getvalue(&buf, &sem_value); //sets sem_value to how many semaphores are left
		if (sem_value > 0 && sem_value <= 5)
		{
			//we lock the mutext first so we dont have a semaphore waiting to lock
			/* aquire the mutex lock */
			pthread_mutex_lock( &mutex );
			
			//hold semaphore
			sem_wait( &buf );
			
			/*** CRITICAL SECTION ***/
			std::cout << "inside the producer thread" << std::endl;
			std::cout << sem_value << " this is the number of semaphore locks remaining" << std::endl;
			//to have each number be random
			while (random_num == std::abs(((rand_r(&seed) + id) % 100)+1))
			{
				random_num = std::abs(((rand_r(&seed) + id) % 100)+1);
			}
			//std::cout<< random_num << std::endl;
			buffer_insert_item(random_num);
			
			//release the semaphore
			//sem_post( &buf );
			/* release the mutex lock */ 
			pthread_mutex_unlock( &mutex );
		}
		//else if(sem_value == 0)
		else if (producer_position  == consumer_position)
		{
			std::cout << "buffer full" << std:: endl;
			//print out buffer full
			std::this_thread::sleep_for (std::chrono::milliseconds(holder));
		}
	}
	
}

void* consumer(void *argv)
{
	int holder = atoi((char *)argv);
	std::cout << "consumer start"	<< std::endl;
	unsigned int seed;
	seed = time(NULL);
	//this is for each thread outputting both random and unique numbers
	std::stringstream X;
	X << std::this_thread::get_id();
	int id = std::stoull(X.str());
	std::cout << id << std::endl;
	while(quit != true)
	{
		holder = (rand_r(&seed) % holder) + 1;
		std::this_thread::sleep_for (std::chrono::milliseconds(holder));
		sem_getvalue(&buf, &sem_value); //sets sem_value to how many semaphores are left
		if (sem_value >= 0 && sem_value < 5)
		{
			//we lock the mutext first so we dont have a semaphore waiting to lock
			/* aquire the mutex lock */
			pthread_mutex_lock( &mutex );
			
			//hold semaphore
			//sem_wait( &buf );
			
			/*** CRITICAL SECTION ***/
			std::cout << "inside the consumer thread" << std::endl;
			std::cout << sem_value << " this is the number of semaphore locks remaining" << std::endl;
			buffer_remove_item();
			
			//release the semaphore
			sem_post( &buf );
			/* release the mutex lock */ 
			pthread_mutex_unlock( &mutex );
		}
		//else if(sem_value == 5)
		else if (consumer_position  == producer_position)
		{
			std::cout << "buffer empty" << std::endl;
			//print out buffer empty
			std::this_thread::sleep_for (std::chrono::milliseconds(holder));
		}
	}
	
}


int main( int argc, char *argv[] )
{
	pthread_t thread ;
	sem_init( &buf, 0, 5 );
	
//  Get command line arguments
	int time_term = atoi(argv[1]);
	int time_sleep = atoi(argv[2]);
	int num_pro = atoi(argv[3]);
	int num_con = atoi(argv[4]);
	std::cout << time_term << std::endl << 
			time_sleep << std::endl << num_pro << 
			std::endl << num_con << std::endl;
//  Create producer thread(s)

	for(int i = 0; i < num_pro; i++)
	{
		pthread_create(&thread,NULL,producer,(void *) argv[2]);
	}
//  Create consumer thread(s)
	for(int i = 0; i < num_con; i++)
	{
		pthread_create(&thread,NULL,consumer,(void *) argv[2]);
	}
//  Sleep
	std::cout << "start sleep" << std::endl;
	std::this_thread::sleep_for (std::chrono::milliseconds(time_term));
	std::cout << "end sleep" << std::endl;
//  Join Threads
	quit = true; //setting quit value to exit threads
	std::cout << "waiting for threads to quit" << std::endl;
	pthread_join(thread,NULL);
//  Display Statistics
//  Exit
}
cannot seem to be able to tell if the consumer has caught up with the producer and vice versa in a circular wait buffer

expanding a previous reply with a suggestion to use 2 std::atomic<bool> variables for the producer and consumer respt:
previous reply: http://www.cplusplus.com/forum/beginner/218307/#msg1007225
expanded suggestion for current case:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# include <iostream>
# include <atomic>
# include <future>
# include <thread>
# include <chrono>

std::atomic<bool> bool_ProducerReady{false};
std::atomic<bool> bool_ConsumerDone{true};
//since the producer goes first we have the above order of assignments
//always initialize atomic objects because the default ctor does not fully initialize

void producer()
{
    while(!bool_ConsumerDone.load())//at the launch of the program this will be true and so producer starts
    {
       std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
    //when consumer is done, do whatever producer needs to do

    bool_ProducerReady.store(true);
    // store() assigns a new value
}

void consumer()
{
    while(!bool_ProducerReady.load())//at the launch of the program this will be false and so consumer waits
    //load() returns current value
    {
      std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
    {
        bool_ConsumerDone.store(false);//this blocks of the producer
        //// do whatever consumer needs to do.
    }

    bool_ConsumerDone.store(true);
}

int main()
{
   //same as case (b) (nb: see previous post)
}
Topic archived. No new replies allowed.