How can I add reading from stdin functionality to my multithreaded app?

Hello,

I have a two-threaded app (developed with some help around here) that reads quotes (prices) over the sockets in receive_quotes thread and does some calculations on those quotes in process_quotes thread. In the code below, for simplicity sake, I substituted reading quotes over the sockets by randomly generating quotes.

I need to add functionality to read an input from stdin so I will be able to pass commands to my app. I am not sure how to do it, should I add another thread that will be reading from stdin?

EDIT: I added a global mean_vec vector that I fill out with calculated mean values in process_quotes thread. I want to be able to edit those values from command line, so I enter the index into stdin and if that value already exists I multiply it by 10. So in production code I will be adding elements to vector in process_quotes thread but only editing in main thread. I updated my code. Do you think it's thread safe?

I'll appreciate your ideas, TIA!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
// g++ -g -std=c++0x -pthread mthread_test.cpp -o test

//
// mthread_test.cpp
//
#include<iostream>
#include<string>
#include<deque>
#include<vector>
#include<cstdlib>   	// rand(), atoi()
#include<fstream>
using namespace std;

string datafile_name = "z_test_quotes.log";
string processfile_name = "z_test_process.log";
pthread_t quote_thread, process_thread;
pthread_mutex_t quotes_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  quote_condition = PTHREAD_COND_INITIALIZER;
deque<float> quotes_deque;
bool newquote_flag = false;

// EDIT: ADDED 
pthread_mutex_t mean_mutex = PTHREAD_MUTEX_INITIALIZER;
vector<float> mean_vec;
// END

void* thfun_receive_quotes(void* arg);
void* thfun_process_quotes(void* arg);
void my_err(string str);
void my_nsleep(int nanoseconds);
int my_log(string file_name, string str);
float calc_mean(deque<float>& quotes);
int Run();

int main()
{
    Run();
    return 0;
}

int Run()
{
    my_err(" ******  starting process... ****** ");
    unlink(datafile_name.c_str());
    unlink(processfile_name.c_str());

    if ( pthread_create(&quote_thread, NULL, thfun_receive_quotes, NULL) != 0 )
        my_err(" error creating thread quote_thread ");

    if ( pthread_create(&process_thread, NULL, thfun_process_quotes, NULL) != 0 )
        my_err(" error creating thread eur_thread");

    // EDIT: Added code to read from stdin.	
    string input_line;
    int idx = 0;		
    while(cin) 
    {
        getline(cin, input_line);
	idx = atoi(input_line.c_str());	
        cout << idx << endl << flush;
		
	// UPDATE mean_vec
	if( mean_vec.size() >= idx)
	{
	        cout << mean_vec.at(idx) << endl << flush;
		mean_vec.at(idx) = mean_vec.at(idx) * 10;
	        cout << mean_vec.at(idx) << endl << flush;
	}
	// END OF UPDATE

    }

    pthread_join( quote_thread, NULL);
    pthread_join( process_thread, NULL);

    my_err(" ******  terminating process ******* ");
    return 0;
}

// thread function receive.
void* thfun_receive_quotes(void* arg)
{
    my_err(" ******  starting receive thread ****** ");
    float quote;

    for(unsigned long i=0; i<500; i++)
    {
        my_nsleep(100000000);   			// Simulate sockets delay with sleep 100 milli seconds.

        quote = rand() / (float)RAND_MAX;       	// Generate random number.

        pthread_mutex_lock( &quotes_mutex );		// Lock.
        quotes_deque.push_back(quote);			// Save quote into a global deque.		
        newquote_flag = true;
        pthread_mutex_unlock( &quotes_mutex );		// Unlock.
        pthread_cond_signal( &quote_condition );	// Send wake up signal to process thread.
    }

    pthread_cancel(process_thread);

    my_err(" ******  terminating receive thread ****** ");
    return ((void *) 0); 
}

// thread function process.
void* thfun_process_quotes(void* arg)
{
    my_err(" ******  starting process thread ****** ");
    int size = 0;
    string str;
    deque<float> quotes;
    float mean = 0;	

    for(unsigned long j=0; ; j++)
    {
        pthread_mutex_lock( &quotes_mutex );	// Lock.

        // Prevent 'spurious wakeup'.
        while(  !newquote_flag )
            pthread_cond_wait( &quote_condition, &quotes_mutex );
        
	quotes = quotes_deque;			// Copy quotes from global deque to local deque.
        newquote_flag = false;
        pthread_mutex_unlock(&quotes_mutex);	// Unlock.

        // Add calculations here.
        size = quotes.size();
        mean = calc_mean(quotes);		// Process quotes.

	// EDIT: Added to lock global vector which elements updated from stdin.	
	pthread_mutex_lock(&mean_mutex);
	mean_vec.push_back(mean);
	pthread_mutex_unlock(&mean_mutex);
	// END.

	quotes.clear();
        // End of calculations.

        // Logging.
        str = "process_quotes - size: " + to_string(size) + "; mean = " + to_string(mean);
	if( ! (size % 50) )        
		cerr << str << endl << flush;
        //    my_log(processfile_name, str);
    }

    my_err(" ****** terminating process thread ****** ");
    return ((void *) 0); // or pthread_exit((void *)0)
}

float calc_mean(deque<float>& quotes)
{
	float mean = 0;
	float sum;
        for(int i=0; i<quotes.size(); i++)
	{
		sum += quotes.at(i);
	}		
	mean = sum / quotes.size();
        my_nsleep(1000000);         	// Simulate more calculations using delay 1 milli second.

	return mean;
}

void my_err(string str)
{
    std::cerr << str << endl << flush;
}

void my_nsleep(int nanoseconds)
{
    struct timespec time = {0};
    time.tv_sec = 0;
    time.tv_nsec = nanoseconds;
    nanosleep(&time, (struct timespec*) NULL);
}

int my_log(string file_name, string str)
{
    std::ofstream fileStream;
    fileStream.open(file_name.c_str(), ios::out | ios::app);

    if (!fileStream.is_open())
    {
        my_err("my_log() cannot open logging file " + file_name);
        return -1;
    }

    fileStream << str << endl << flush;
    fileStream.close();
    return 0;
}
Last edited on
It seems you want to control what the app is doing. The main thread is an ideal place for this in your app. Instead of waiting for the threads to complete, it can also poll for input from stdin and process it. There isn't a need to create yet another thread.
That was fast & easy - thank you! Yes, I want control and I also want to edit data through command line.

I edited my code a bit:
EDIT: I added a global mean_vec vector that I fill out with calculated mean values in process_quotes thread. I want to be able to edit those values from command line, so I enter the index into stdin and if that value already exists I multiply it by 10. So in production code I will be adding elements to vector in process_quotes thread but only editing in main thread. I updated my code. Do you think it's thread safe?
Last edited on
All shared data must be synchronized, that aside it sounds ok.
Do I need a lock on mean_vec? Not sure how would I implement it here.
Use a new mutex. You lock it before each access.

Alternatively, you use an rwlock as you need to check the value to see if it's been changed before you actually modify the value.
I updated code adding mutex to lock global vector mean_vec when I add new elements to it in process_quotes thread. I am not sure if I have to lock it when I update the elements in the main thread because if it's not lock then it's available - those are the only two places where I access the mean_vec. What do you say?
My getline(cin, input_line) in the main thread (Run() function) also reads whatever messages I output into cerr from other threads thfun_receive_thread and thfun_process_thread. Is it possible to make getline() read only cout and not cerr?
Last edited on
I am not sure if I have to lock it when I update the elements in the main thread
You need to lock it before you read or modify it.

those are the only two places where I access the mean_vec. What do you say?
That's correct.

My getline(cin, input_line) in the main thread (Run() function) also reads whatever messages I output into cerr from other threads
I would not have expected that. I'll take a look tomorrow.
> My getline(cin, input_line) in the main thread (Run() function) also reads whatever > messages I output into cerr from other threads
> I would not have expected that. I'll take a look tomorrow.

Well actually it works alright. I am using Qt Creator and that's what it's doing when I run program under debugger - weird behavior. Once I run my app from the command line it's all good.

kbw - thank you for your help!
Topic archived. No new replies allowed.