Need help w/ BUG in Consumer Producer lab

hey guys, for one of my classes, we're learning semaphores and pthreads.
So, I wrote everything and the code is compiling and running, however, I've got a bug that I can't find.
When I run the program, it looks like the mutex lock doesn't work properly. Some of the consumers try to consume numbers that were already consumed!!!
Do you guys think you could help me find the problem??

Compile with: g++ myLab.cpp -l pthread -o lab
Run with: ./lab 1 1 1

Here's my code:
- Dropbox - myLab.cpp - https://www.dropbox.com/s/au1d6ms7jji19ay/myLab.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#include <boost/lexical_cast.hpp> // lexical_cast
#include <cerrno>  // errno
#include <cstdio>  // perror
#include <cstdlib> // atoi, exit,
#include <iomanip> // setw
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h> // usleep

using namespace std;

// make the Buffer hold integers
typedef int bufferItem;

/******************************************************************************
*
******************************************************************************/
template <class T>
class CircularQueue
{
   public:
      CircularQueue(T* buffer);
      T front()        const { return this->buffer[head]; }
      T back()         const { return this->buffer[tail]; }
      int size()       const { return numItems; }
      int getMaxSize() const { return maxSize; }
      bool full()      const { return numItems == maxSize; }
      bool empty()     const { return numItems == 0; }

      bool enqueue(T  item);        // add an item to the back of the queue
      bool dequeue(T* item = NULL); // remove from the front of the queue

      void display() const;

   private:
      int maxSize;
      int numItems;
      int head;
      int tail;
      T* buffer;
};

// shared memory ---- GLOBAL variables
const int BUFFER_SIZE = 5;
bufferItem buffer[BUFFER_SIZE];
CircularQueue<bufferItem> cq(buffer);
pthread_mutex_t mutex;
sem_t empty;
sem_t full;

// functions
void checkArgs(int argc, char** argv);
void initialize();
void* producer(void* threadID);
void* consumer(void* threadID);
void exitError(string msg, int errCode = 0);

/******************************************************************************
* main()
* - check and get the command line arguments
* - initialize the buffer, mutex lock and semaphores
* - create the producer threads
* - create the consumer threads
* - sleep
* - cancel the threads
* - exit
******************************************************************************/
int main(int argc, char** argv)
{
   // check and assign the args
   checkArgs(argc, argv);
   int sleepTime   = atoi(argv[1]);
   int numProducer = atoi(argv[2]);
   int numConsumer = atoi(argv[3]);

   // initialize the buffer, mutex lock, and semaphores
   initialize();
   pthread_t producers[numProducer]; // keep track of the threads
   pthread_t consumers[numConsumer]; // keep track of the threads
   int r = 0; // used for checking the return codes
   int* threadID = NULL;

   cout << "Produced by P# Consumed by C#" << endl
        << "======== ===== ======== =====" << endl;

   // create the producer threads
   for (int i = 0; i < numProducer; i++)
   {
      threadID = new int(i + 1);
      r = pthread_create(&producers[i], NULL, producer, (void*) threadID);
      if (r)
         exitError("Failed to create producer thread", r);
   }

   // create the consumer threads
   for (int i = 0; i < numConsumer; i++)
   {
      threadID = new int(i + 1);
      r = pthread_create(&consumers[i], NULL, consumer, (void*) threadID);
      if (r)
         exitError("Failed to create consumer thread", r);
   }

   // ZZzZZZzzZZZzz
   sleep(sleepTime);

   // cancel the threads
   for (int i = 0; i < numProducer; i++)
   {
      r = pthread_cancel(producers[i]);
      if (r)
         exitError("Failed to cancel producer thread", r);
   }

   for (int i = 0; i < numConsumer; i++)
   {
      r = pthread_cancel(consumers[i]);
      if (r)
         exitError("Failed to cancel consumer thread", r);
   }

   // destroy the mutex lock and the semaphores
   r = pthread_mutex_destroy(&mutex);
   if (r)
      exitError("Failed to destroy the mutex lock", r);
   r = sem_destroy(&empty);
   if (r)
      exitError("Failed to destroy the 'empty' semaphore", r);
   r = sem_destroy(&full);
   if (r)
      exitError("Failed to destroy the 'full' semaphore", r);

   return 0;
}


/******************************************************************************
* initialize()
* - initializes the mutex lock, semaphores and the buffer
******************************************************************************/
void initialize()
{
   int r; // stores the resulting return value

   // init the mutex lock w/ default attribs.
   r = pthread_mutex_init(&mutex, NULL);
   if (r)
      exitError("Failed to initialize the mutex lock", r);

   // sem_init(ptr_to_sem, sharing_lvl, semaphore_initial_value)
   // init the EMPTY SEMAPHORE
   r = sem_init(&empty, 0, BUFFER_SIZE);
   if (r)
      exitError("Failed to initilize the 'empty' semaphore", r);

   // init the FULL SEMAPHORE
   r = sem_init(&full, 0, 0);
   if (r)
      exitError("Failed to initilize the 'full' semaphore", r);

   // init the Buffer
   for (int i = 0; i < BUFFER_SIZE; i++)
      buffer[i] = 0;
}

/******************************************************************************
* producer()
* - generates pseudorandom numbers and pushes them to the queue
******************************************************************************/
void* producer(void* threadID)
{
   int id = *(int*)threadID;
   delete (int*) threadID;
   threadID = NULL;
   bufferItem itemProduced = -1;
   int r1;
   int r2;

   while (true)
   {
      usleep(rand() % 150000); // sleep a random amount of time
      itemProduced = (rand() % 1000); // use 3-digit numbers 0-999

      // acquire the semaphore and the mutex lock
      r1 = sem_wait(&empty);
      r2 = pthread_mutex_lock(&mutex);
      if (r1)
         exitError("Failed to acquire the `empty` semaphore", r1);
      if (r2)
         exitError("Failed to acquire the mutex lock", r2);

      // insert item into shared global buffer and print what was done
      if (cq.enqueue(itemProduced))
      {
         cout << setw(5) << right << itemProduced
              << setw(6) << 'P' << left << id << endl;
      }
      else
         exitError("Failed to produce a new item");

      r1 = pthread_mutex_unlock(&mutex); // release the krakens!
      r2 = sem_post(&full); // increment the 'full' semaphore
      if (r1)
         exitError("Failed to release the mutex lock", r1);
      if (r2)
         exitError("Failed to increment the number of full slots", r2);
   }
   return NULL;
}

/******************************************************************************
* consumer()
* - consumes numbers from the queue
******************************************************************************/
void* consumer(void* threadID)
{
   int id = *(int*)threadID;
   delete (int*) threadID;
   threadID = NULL;
   bufferItem itemConsumed = -1;
   int r1;
   int r2;

   while (true)
   {
      usleep(rand() % 150000); // sleep a random amount of time

      r1 = sem_wait(&full);
      r2 = pthread_mutex_lock(&mutex);
      if (r1)
         exitError("Failed to acquire the `full` semaphore", r1);
      if (r2)
         exitError("Failed to acquire the mutex lock", r2);

      // consume item from shared global buffer and print what was done
      if (cq.dequeue(&itemConsumed))
      {
         cout << setw(20) << right << itemConsumed
              << setw(7) << 'C' << left << id << endl;
      }
      else
         exitError("Failed to consume an item");

      r1 = pthread_mutex_unlock(&mutex); // release the krakens!
      r2 = sem_post(&empty);
      if (r1)
         exitError("Failed to release the mutex lock", r1);
      if (r2)
         exitError("Failed to increment the number of empty slots", r2);

   }
   return NULL;
}


Last edited on
Since I couldn't post the whole code in a single post, here's the rest of the code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/******************************************************************************
* checkArgs
* - checks that all arguments were positive integers
* - it exits the program if it didn't pass the check
******************************************************************************/
void checkArgs(int argc, char** argv)
{
   bool ok = false;
   if (argc == 4)
   {
      ok = true;
      for (int i = 1; i < 4; i++) // check that all args are positive ints
         if (atoi(argv[i]) <= 0)
         {
            ok = false;
            break;
         }
   }

   if (!ok)
   {
      cout << "Usage: \t" << argv[0]
           << " <sleep_time> <num_producer> <num_consumer>\n";
      cout << "\tEach argument must be a positive integer.\n";
      exit(1);
   }
}

/******************************************************************************
* exitError()
* - outputs the error message to STDERR and exit the program
******************************************************************************/
void exitError(string msg, int errCode)
{
   msg.insert(0, "Error: ");
   if (errCode) // prepend the errorCode, if there's one
   {
      string prepend = "(";
      prepend += boost::lexical_cast<string>(errCode);
      prepend += ") - ";
      msg.insert(7, prepend);
   }

   if (errno) // if errno is set, then use it
      perror(msg.c_str());
   else
      cerr << msg << endl;

   exit(1);
}

/******************************************************************************
* CircularQueue constructor
******************************************************************************/
template <class T>
CircularQueue<T>::CircularQueue(T* buffer)
: head(0), tail(0), numItems(0), buffer(buffer)
{                         // not sure why this didn't compute right...
   maxSize = BUFFER_SIZE; // maxSize = sizeof(buffer) / sizeof(T);
}

/******************************************************************************
* enqueue()
* - pushes an item to the end of the queue
******************************************************************************/
template <class T>
bool CircularQueue<T>::enqueue(T item)
{
   if (!full())
   {
      buffer[tail++] = item;
      tail %= maxSize;
      numItems++;
      return true;
   }
   return false;
}

/******************************************************************************
* dequeue()
* - removes an item from the front of the list
******************************************************************************/
template <class T>
bool CircularQueue<T>::dequeue(T* item)
{
   if (!empty())
   {
      if (item)
         *item = buffer[head];
      head %= (head + 1) % maxSize;
      numItems--;
      return true;
   }
   return false;
}

/******************************************************************************
* display()
* - displays the queue from front to back
******************************************************************************/
template <class T>
void CircularQueue<T>::display() const
{
   int i = head;
   for (int counter = 0; counter < numItems; counter++)
   {
      cout << this->buffer[i] << " ";
      i = (i + 1) % maxSize;
   }
   cout << endl;
}



Expected Output (the numbers will change)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Produced  by P#  Consumed  by C#
========  =====  ========  =====
  777     P1
                   777     C1
  335     P1
                   335     C1
  649     P1
                   649     C1
   27     P1
                    27     C1
  763     P1
                   763     C1
  426     P1
                   426     C1
  211     P1
  567     P1
  782     P1
                   782     C1
  123     P1
                   123     C1
  929     P1
   22     P1
   69     P1
                    69     C1
                    22     C1
   11     P1
  229     P1
                   229     C1


Actual Output (you will notice that some Consumers will be consuming items that were already consumed)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Produced by P# Consumed by C#
======== ===== ======== =====
  777     P1
                 777      C1
  335     P1
                 777      C1
  649     P1
                 777      C1
   27     P1
                 777      C1
  763     P1
                 777      C1
  426     P1
                 426      C1
  211     P1
  567     P1
  782     P1
                 426      C1
  123     P1
                 426      C1
  929     P1
   22     P1
   69     P1
                 929      C1
                 929      C1
   11     P1
  229     P1
                 929      C1
head %= (head + 1) % maxSize;

Say head is 0 and maxSize is 5. What is the result of this operation?
OMG that was it! :D
the correct line of code is:
head = (head + 1) % maxSize;

You're awesome! You don't know how long I've spent trying to find the problem. Thanks!!!
Last edited on
Topic archived. No new replies allowed.