1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
#include<iostream>
#include<string>
#include<deque>
#include<cstdlib> // rand()
#include<fstream>
using namespace std;
int DEQUE_SIZE = 30;
string datafile_name = "z_test_quotes.log";
string processfile_name = "z_test_process.log";
pthread_t quote_thread, process_thread;
pthread_mutex_t quotes_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t quote_condition = PTHREAD_COND_INITIALIZER;
deque<float> quotes_deque;
bool newquote_flag = false;
void* thfun_receive_quotes(void* arg);
void* thfun_process_quotes(void* arg);
void my_err(string str);
void my_nsleep(int nanoseconds);
int my_log(string file_name, string str);
int Run();
int main()
{
Run();
return 0;
}
int Run()
{
my_err(" ****** starting process... ****** ");
unlink(datafile_name.c_str());
unlink(processfile_name.c_str());
if ( pthread_create("e_thread, NULL, thfun_receive_quotes, NULL) != 0 ) // pthread_create() does not set usual errno but returns error code.
my_err(" error creating thread quote_thread ");
if ( pthread_create(&process_thread, NULL, thfun_process_quotes, NULL) != 0 )
my_err(" error creating thread eur_thread");
pthread_join( quote_thread, NULL);
pthread_join( process_thread, NULL);
my_err(" ****** terminating process ******* ");
return 0;
}
// thread function
void* thfun_receive_quotes(void* arg)
{
my_err(" ****** starting receive thread ****** ");
float quote;
for(unsigned long i=0; i<500; i++)
{
my_nsleep(1); // Simulate delay.
quote = rand() / (float)RAND_MAX; // Simulate get quotes.
pthread_mutex_lock( "es_mutex );
quotes_deque.push_back(quote);
newquote_flag = true;
pthread_mutex_unlock( "es_mutex );
pthread_cond_signal( "e_condition );
}
pthread_cancel(process_thread);
my_err(" ****** terminating receive thread ****** ");
return ((void *) 0); // or pthread_exit((void *)0)
}
// thread function
void* thfun_process_quotes(void* arg)
{
my_err(" ****** starting process thread ****** ");
int size = 0;
string str;
deque<float> quotes;
for(unsigned long j=0; ; j++)
{
pthread_mutex_lock( "es_mutex );
// Check if the new data has really arrived,
// to prevent 'spurious wakeup'.
while( !newquote_flag )
pthread_cond_wait( "e_condition, "es_mutex );
quotes = quotes_deque;
newquote_flag = false;
pthread_mutex_unlock("es_mutex);
// Add calculations here.
size = quotes.size();
my_nsleep(1000); // Simulate calculations. At 1 usec it starts skipping every other quote.
quotes.clear();
// End of calculations.
// Logging.
str = "process_quotes - size: " + to_string(size) + " * ";
cout << str << endl << flush;
// kk_log(processfile_name, str);
}
my_err(" ****** terminating process thread ****** ");
return ((void *) 0); // or pthread_exit((void *)0)
}
void my_err(string str)
{
std::cerr << str << endl << flush;
}
void my_nsleep(int nanoseconds)
{
struct timespec time = {0};
time.tv_sec = 0;
time.tv_nsec = nanoseconds;
nanosleep(&time, (struct timespec*) NULL);
}
int my_log(string file_name, string str)
{
std::ofstream fileStream;
fileStream.open(file_name.c_str(), ios::out | ios::app);
if (!fileStream.is_open())
{
my_err("my_log() cannot open logging file " + file_name);
return -1;
}
fileStream << str << endl << flush;
fileStream.close();
return 0;
}
|