I need some input with mutex and condition variable.

[UPDATED]

I do not think my code is doing what I want it to do. I want receive() thread to work in real time as fast as possible and not to wait for process() thread. Process() thread can work slower than receive() so it does calculations not every time new quote received. So, I inserted sleep() 20 milliseconds (it can be longer but the result is the same) into the process() thread function to simulate some calculations however it slows down the whole process - both threads. I'd expect receive() thread to finish first and process() thread not to be able to run every time I add new quote to the dequeue.

Thank you.


Here is my updated code. To build:
 
g++ -g -O0 -fno-inline -std=c++0x -pthread test.cpp -o test


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include<iostream>
#include<string>
#include<deque>
#include<cstdlib>   // rand()
#include<fstream>
using namespace std;

int DEQUE_SIZE = 30;
string datafile_name = "z_test_quotes.log";
string processfile_name = "z_test_process.log";
pthread_t quote_thread, process_thread;
pthread_mutex_t quotes_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  quote_condition = PTHREAD_COND_INITIALIZER;
deque<float> quotes_deque;
bool newquote_flag = false;

void* thfun_receive_quotes(void* arg);
void* thfun_process_quotes(void* arg);
void my_err(string str);
void my_nsleep(int nanoseconds);
int my_log(string file_name, string str);
int Run();

int main()
{
    Run();
    return 0;
}

int Run()
{
    my_err(" ******  starting process... ****** ");
    unlink(datafile_name.c_str());
    unlink(processfile_name.c_str());

    if ( pthread_create(&quote_thread, NULL, thfun_receive_quotes, NULL) != 0 ) // pthread_create() does not set usual errno but returns error code.
        my_err(" error creating thread quote_thread ");

    if ( pthread_create(&process_thread, NULL, thfun_process_quotes, NULL) != 0 )
        my_err(" error creating thread eur_thread");

    pthread_join( quote_thread, NULL);
    pthread_join( process_thread, NULL);

    my_err(" ******  terminating process ******* ");

    return 0;
}

// thread function
void* thfun_receive_quotes(void* arg)
{
    my_err(" ******  starting receive thread ****** ");
    float quote;

    for(unsigned long i=0; i<500; i++)
    {
        my_nsleep(1);   // Simulate delay.

        quote = rand() / (float)RAND_MAX;       // Simulate get quotes.
        pthread_mutex_lock( &quotes_mutex );

        quotes_deque.push_back(quote);

        newquote_flag = true;
        pthread_mutex_unlock( &quotes_mutex );
        pthread_cond_signal( &quote_condition );
    }

    pthread_cancel(process_thread);

    my_err(" ******  terminating receive thread ****** ");
    return ((void *) 0); // or pthread_exit((void *)0)
}

// thread function
void* thfun_process_quotes(void* arg)
{
    my_err(" ******  starting process thread ****** ");
    int size = 0;
    string str;
    deque<float> quotes;

    for(unsigned long j=0; ; j++)
    {
        pthread_mutex_lock( &quotes_mutex );

        // Check if the new data has really arrived,
        // to prevent 'spurious wakeup'.
        while(  !newquote_flag )
            pthread_cond_wait( &quote_condition, &quotes_mutex );
        quotes = quotes_deque;
        newquote_flag = false;
        pthread_mutex_unlock(&quotes_mutex);

        // Add calculations here.
        size = quotes.size();
        my_nsleep(1000);         // Simulate calculations. At 1 usec it starts skipping every other quote.
        quotes.clear();
        // End of calculations.

        // Logging.
        str = "process_quotes - size: " + to_string(size) + " * ";
        cout << str << endl << flush;
        //    kk_log(processfile_name, str);
    }

    my_err(" ****** terminating process thread ****** ");
    return ((void *) 0); // or pthread_exit((void *)0)
}

void my_err(string str)
{
    std::cerr << str << endl << flush;
}

void my_nsleep(int nanoseconds)
{
    struct timespec time = {0};
    time.tv_sec = 0;
    time.tv_nsec = nanoseconds;
    nanosleep(&time, (struct timespec*) NULL);
}

int my_log(string file_name, string str)
{
    std::ofstream fileStream;
    fileStream.open(file_name.c_str(), ios::out | ios::app);

    if (!fileStream.is_open())
    {
        my_err("my_log() cannot open logging file " + file_name);
        return -1;
    }

    fileStream << str << endl << flush;
    fileStream.close();
    return 0;
}
Last edited on
For one thing, this does not compile: I had to replace the mysterious itos() and dtos() with std::to_string(), and I dummied out the log_tofile()

As for why receive_data() gets to iterate multiple times between each iteration of process_data(): there is no wait of any kind in receive_data(). As soon as process_data() unlocks the mutex, receive_data() runs for as long as the scheduler will let it.
Sorry about the source, I updated it so it compiles now. There is a delay in receive_data() using for() loop.

My intention for receive_data() is to receive the data over the sockets in real time, lock the prices vector while I am updating it, once I am done updating send the signal to process_data() that the new data are available so process_data() can start its calculations. When the data are coming very fast process_data() can take longer than receive_data() so process_data() will not be running every time new price arrives; when the data are coming slowly then process_data() should be running every time new price arrives.

I expect right now process_data() to run every time new price arrives but it does not. What is wrong with my code? - Thank you!

process_data size: 23
process_data size: 24
process_data size: 26
process_data size: 28
process_data size: 29
process_data size: 30
process_data size: 31
process_data size: 33
process_data size: 35
process_data size: 37
process_data size: 38
process_data size: 40
process_data size: 41
process_data size: 43
process_data size: 44
process_data size: 45
process_data size: 46
process_data size: 48
process_data size: 49
process_data size: 50
Now I got it about the wait - thank you for your help!
Last edited on
Cubbi, what compiler do you use? With gcc 4.5.2 I get an error 'to_string' is not a member of 'std'. I know to_string() is part of C++11 I guess gcc does not support it yet. This is a handy function to have as part of a language or standard library.
Last edited on
There is a delay in receive_data() using for() loop.

Not really, that loop does nothing and is removed by the compiler at any useful optimization setting.

With gcc 4.5.2 I get an error 'to_string' is not a member of 'std'.

You need -std=c++0x to enable it in 4.5.2

Incidentally, 4.5.2 supports C++ threads, so that delay could have been done with std::this_thread::sleep_for(std::chrono::milliseconds(100)); for example
Last edited on
Yeah I figured about the wait after you told me earlier. to_string() is great. I am using nanosleep(). I appreciate your help with this!
My code is not doing what I want. I updated my first post. Could someone take a look - thank you!
What is it doing and what is expected output? Off-hand, I see an if instead of a while when waiting on a condition variable.
Last edited on
You are right about while().

The output right now:
process_quotes - size: 1 *
process_quotes - size: 2 *
process_quotes - size: 3 *
process_quotes - size: 4 *
process_quotes - size: 5 *
process_quotes - size: 6 *
process_quotes - size: 7 *
process_quotes - size: 8 *
process_quotes - size: 9 *
process_quotes - size: 10 *
................................
process_quotes - size: 50 *


Because I added sleep in process thread function, I expect the output to be something like:
process_quotes - size: 1 *
process_quotes - size: 4 *
process_quotes - size: 7 *
process_quotes - size: 10 *
................................
process_quotes - size: 50 *
Last edited on
If you expected the receive thread to keep iterating while the process thread is sleeping, it can't currently happen : the process thread sleeps with mutex locked. The receive thread cannot proceed past line 74.
ah, right, receive thread cannot proceed while process thread locks quotes_deque. I guess I can make a local copy of quotes_deque in process thread, unlock the mutex when I am done, then perform the calculations against the copy of the quotes_deque. The drawback is that will require twice more memory.

EDIT: I updated the first post with the code that implements copy of the quotes_deque. It looks good so far. Process thread starts skipping every other quote if sleep is 1 microsec and skips few quotes when sleep is 100 microsec. I'll test some more.

When sleep is 100 microsec the output:
.....................................
process_quotes - size: 302 *
process_quotes - size: 305 *
process_quotes - size: 309 *
process_quotes - size: 312 *
process_quotes - size: 316 *
process_quotes - size: 319 *
process_quotes - size: 323 *
process_quotes - size: 326 *
process_quotes - size: 330 *
process_quotes - size: 333 *
.....................................
Last edited on
Perhaps you could explain the task in more detail? If you're simulating a single producer-single consumer setup with a queue, only the push operation in the producer and only the pop (and the while empty wait) in the consumer need to be synchronized.
Last edited on
First, I receive the quotes (prices) one by one in real time 24x5 over the sockets at a rate ~100 microseconds, sometimes faster depending on the time of the day. I may receive >100MB of quotes a day hence I will set the limit to the dequeue size (say ~1MB), adding new quotes and removing old ones (it's not in the code right now). The stream of quotes I simulate with rand() right now in receive thread.

Second, I need to perform some calculations on all the quotes that are in the dequeue, so every time a new quote arrives I have to run the calculations again against all the quotes in the dequeue. I'll strive to do the calculations as fast as I receive the quotes but depending how fast the quotes are coming at particular time of the day, calculations may take longer. This I do in the process thread.

That's pretty much it. It's not optimal what I am doing - I should not have two dequeues.
Last edited on
You could pick a container where insert does not invalidate iterators (list or timestamp-ordered set), and replace quotes = quotes_deque; with the much faster list<>::const_iterator first = quotes_list.begin();.

But if you introduce a third thread that is erasing concurrently, then it would have to communicate its intent to erase to the processor and wait for it to acknowledge that it won't use the soon-to-be-erased stale data.. or just have the processor do the erasing (e.g. make it the Consumer)
Hi Cubbi, I think I'll just go with the list for now. I do not want to over complicate things with another thread mainly because I'll have several process (consumer) threads.

-- thank you for all your help!
Topic archived. No new replies allowed.