Producer/Consumer Shared File

Hi all,

I have been working on this little introductory project to the Producer/Consumer threading concept. For this project, I was required to share a common file resource between the two threads. One of the threads would write lines of output to the file (producer) and the other thread would read a line from the file and mark that line read with an asterisk at the end using file pointer operations(Consumer). The producer thread would then sleep for 300 milliseconds while the consumer thread would sleep for 600 milliseconds (2 writes for every 1 read essentially). This is essentially the gist of what I am supposed to do.

So far, I've written the program and gotten the output from the program that I am supposed to get. My problem though is that the lines of output in the file all appear clustered on one line like so (in this example output where I've written ten lines to file and read 9 of those lines):

Line Number# is1*Line Number# is2*Line Number# is3*Line Number# is4*Line Number# is5*Line Number# is6*Line Number# is7*Line Number# is8*Line Number# is9*Line Number# is10

when ideally I would like the file output to appear as:

Line Number# is1*
Line Number# is2*
Line Number# is3*
Line Number# is4*
Line Number# is5*
Line Number# is6*
Line Number# is7*
Line Number# is8*
Line Number# is9*
Line Number# is10


Quite frankly I don't quite understand why my initial output doesn't already show up as this. Can anyone help to shed light on this?
Here is my code (has a system("pause") call at the end of main):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
/*
 * File acts as a FIFO queue
 *
 * Each line in the file will have an asterisk appended on it to the end to
 * signify that it has been read or "deleted".
 *
 * File write 20 example lines to a file named fifo.txt
 *
 * This program leaves at least one record in the file unread (i.e. the last record)
 */
 
//----------------
// Include Section 
//----------------
#include <cstdlib>
#include <iostream>
#include <windows.h>
#include <stdio.h>
#include <fstream>
#include <string>
using namespace std;

#define THREADCOUNT 2

HANDLE ghMutex;
int MAX_COUNT = 20; //max times each thread will execute

DWORD WINAPI WriteToFile(LPVOID);
DWORD WINAPI ReadFromFile(LPVOID);

//---------------------------
//  M A I N   P R O G R A M
//---------------------------

int main( void )
{
    //create an array of thread handles
    HANDLE aThread[THREADCOUNT];
    //create a thread identifier
    DWORD ThreadID;
    int i;

    ghMutex = CreateMutex( 
        NULL,              // default security attributes
        FALSE,             // initially not owned
        NULL);             // unnamed mutex

    if (ghMutex == NULL) 
    {
        printf("CreateMutex error: %d\n", GetLastError());
        return 1;
    }

    // Create worker threads:
    //-----------------------
        //1. Producer Thread
        //-------------------
        aThread[0] = CreateThread( 
                         NULL,       // default security attributes
                         0,          // default stack size
                         WriteToFile,//function this thread will use
                         NULL,      //thread function arguments
                         0,          // default creation flags
                         &ThreadID); // receive thread identifier
    
        if( aThread[0] == NULL )
        {
            printf("CreateThread error: %d\n", GetLastError());
            return 1;
        }
        
        //2. Consumer threads
        //-------------------
        aThread[1] = CreateThread( 
                     NULL,       // default security attributes
                     0,          // default stack size
                     ReadFromFile,//function this thread will use
                     NULL,       //thread function arguments
                     0,          // default creation flags
                     &ThreadID); // receive thread identifier
    
        if( aThread[1] == NULL )
        {
            printf("CreateThread error: %d\n", GetLastError());
            return 1;
        }

    // Wait for all threads to terminate
    WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);

    // Close thread, mutex, and file handles
    for( i=0; i < THREADCOUNT; i++ )
        CloseHandle(aThread[i]);
        
    CloseHandle(ghMutex);
    
    system("pause");
    return 0;
}

//-----------------------------------------
//  Producer Thread: WriteToFile Function
//----------------------------------------- 
DWORD WINAPI WriteToFile(LPVOID file)
{
    ofstream writeFile;
    DWORD dwCount=0, dwWaitResult;

    while( dwCount < MAX_COUNT )
    { 
        //Wait until the specified object is in the 
        //signaled state or the time-out interval elapses.
        //see: http://msdn.microsoft.com/en-us/library/ms687032%28v=vs.85%29.aspx
        dwWaitResult = WaitForSingleObject( 
            ghMutex,    // handle to mutex*
            INFINITE);  // no time-out interval
    
        switch (dwWaitResult) 
        {
            // The thread got ownership of the mutex
            case WAIT_OBJECT_0: 
                dwCount++;
                printf("Producer: Thread ID# %d, and the count is %d\n",
                    GetCurrentThreadId(), dwCount);
                /*
                ********************************************************
                */
                writeFile.open("fifo.txt", ios::app);
                writeFile << "Line Number# is" << dwCount << "\n";
                writeFile.close();
                /*
                ********************************************************
                */ 
                           
                //Releases ownership of the specified mutex object.
                //the parameter is the handle to the mutex object.
                //When a thread is done with the mutex object,
                //the thread should release the mutex to be used by
                //another thread
                ReleaseMutex(ghMutex);
                Sleep(300);
                
                break; 

            // The thread got ownership of an abandoned mutex
            // This is not good.
            case WAIT_ABANDONED: 
                return FALSE; 
        }
    }
    return TRUE; 
}

//-----------------------------------------
//  Consumer Thread: ReadFromFile Function
//-----------------------------------------
DWORD WINAPI ReadFromFile(LPVOID file)
{
    
    fstream readFile;
    char marker = '*';
    string buffer;
    int lineCount = 1;
    
    DWORD dwCount=0, dwWaitResult; 
   
    // Request ownership of mutex.
    while( dwCount < MAX_COUNT-1 )
    { 
        //Wait until the specified object is in the 
        //signaled state or the time-out interval elapses.
        //see: http://msdn.microsoft.com/en-us/library/ms687032%28v=vs.85%29.aspx
        dwWaitResult = WaitForSingleObject( 
            ghMutex,    // handle to mutex*
            INFINITE);  // no time-out interval
       
        switch (dwWaitResult) 
        {
            // The thread got ownership of the mutex
            case WAIT_OBJECT_0: 
                dwCount++;
            
                printf("Consumer: Thread ID# %d, and the count is %d\n",
                    GetCurrentThreadId(), dwCount);
                
                readFile.open("fifo.txt");
                if (!readFile.is_open())    //text file has not been created
                {
                    cout << "Error!!\n";
                    readFile.clear();
                }
                else    //it exists
                {
                    readFile.seekg(0, ios::beg);
                    int fileBytes = 0;
                    while(readFile)
                    {
                        getline(readFile, buffer); 
                        printf("File: Line Number %d was marked as read\n", lineCount);
                        printf("-------------------------------------------\n");
                        
                        fileBytes += buffer.length();   //used for offsetting the put pointer
                        string sub = buffer.substr(buffer.length()-1,buffer.length());
                        int asterisk = sub.find('*');   //look for the asterisk at the end of the line
                        if (asterisk == -1) 
                        {
                            //place the asterisk at the end of the line to signify it was read
                            readFile.seekp(fileBytes+1, ios::beg);
                            readFile.put(marker);
                            readFile.seekp(0, ios::end);
                            break;    
                        }
                    }
                    lineCount++;
                    readFile.close(); 
                }  
                      
                /*
                *****************************************************************
                */ 
                           
                //Releases ownership of the specified mutex object.
                //the parameter is the handle to the mutex object.
                //When a thread is done with the mutex object,
                //the thread should release the mutex to be used by
                //another thread
                ReleaseMutex(ghMutex);
                Sleep(600);
                
                break; 

            // The thread got ownership of an abandoned mutex
            // This is not good.
            case WAIT_ABANDONED: 
               return FALSE; 
        }
    }
    return TRUE; 
}
Topic archived. No new replies allowed.