Review needed multithreaded (pthreads) tcp server communication COBAN protocol

Hello fellow programmers,

I have been having a lot of fun learning C++ on this website. For the peoples who made the tutorials, my compliments on the clarity and detail!

To put what I learned into practice I am writing a multithreaded server that accepts connections from multiple client GPS trackers (COBAN protocol) and would really appreciate if someone could review my code with regard to the multithreading part in main. (COBAN not yet implemented fully)

The design is basically thus:
- create socket
- initially start X amount of threads and detaches
- If "enough" threads are available, run "idle" thread
- each started thread accepts a connection, decrements thread counter and ends idle thread so main can recreate more threads
- mutex lock ensures integrity of thread counter

All comments are welcome as I am learning the language and I expect my design and implementation to be far from correct or optimal.

For instance: in void *communicate I am not happy about emptying and reusing variables. How to do it better? I don't know..

Oh and the code is here: http://pastebin.com/Mex0MrZ3
Last edited on
On style, I think it's very wordy and distracting, like a an overgrown garden. You need to weed out all the "bits of code". I mean things inline error handling and management of state variables.

C++ syntax has some nice features to hide all that stuff. State should be managed in a class. Structs can have constructors that will remove their initialization from your code.

On design, rather than spinning up threads that sit around and die after one use, you could create a thread pool that reuses a dynamically configurable number of threads.

Just because you're using Procedural code, does not mean that your code should follow the "flow of conscience" style.
Thanks kbw, that is helpful indeed! I will weed it out.. :) Could you verify that I am on the right track with the following todo list?

1. weed out things: inline error handling
- Use try/catch?

2. Manage state variables in a class
- Create class with function for increment/decrement?

3. Structs can have constructors that will remove their initialization from your code
- Are you referring to unneeded initialisation of "struct socket_structure SS;" (line 101?). I am having difficulty applying what I read about constructors to you comment on my code :(

4. I will take your suggestion to implement a thread pool into account and think on an approach. I gather this will decrease overhead and improve performance?
You really want a main that looks something like:
1
2
3
4
5
6
7
8
9
10
int main()
{
	int s = ServerSocket("0.0.0.0", portnr);

	Connection conn;
	while (Accept(s, conn))
		HandleConnect(conn);

	close(s);
}


I'll paste your code in for posterity.
http://pastebin.com/Mex0MrZ3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <iostream>
#include <unistd.h>
#include <sstream>
#include <errno.h>
#include <string.h> //(memset,)
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define PORT 65530
#define BACKLOG 10
#include <time.h>
//g++ thread_posix_tcp3.cpp -lpthread -o thread_posix_tcp3.o
//learning mutli threading
//todo:
// parameterize listen port
// bounds checking socket vars
// !stresstest hanging threads
// timeout socket accept
// !idle thread
// catch ctrl-c and shutdown scoket gracefully
// heartbeat? todo: imei nr check
// implement COBAN fully
 
//define posix thread "idle"
pthread_t idleThr;
 
//structure of three floats: N/E and speed
struct threeInts{
        double N,E;
        float sp;
};
 
//returns structure from tracker response string. Call: struct threeInts test;test=returnNEsp(s);
struct threeInts returnNEsp(char s[200]){
        //define structure and set values
        struct threeInts NEsp;
        const char* p;
        int cnt=0;
        for (p = strtok( s, "," );  p;  p = strtok( NULL, "," ))
        {
          if(cnt==7)
                NEsp.N=atof(p);
          if(cnt==9)
                NEsp.E=atof(p);
          if(cnt==11)
                NEsp.sp=atof(p);
          ++cnt;
        }
        return NEsp;
}
 
void *idleThread(void *arg){
    //printf("We are now in idleThread..\n");
    int i;
    for(i=0;i<60;++i){
        //printf("\tIdlesec %i\n",i);
        sleep(1);
    }
    pthread_exit(NULL);
}
 
pthread_mutex_t a_mutex = PTHREAD_MUTEX_INITIALIZER;
int totThr=2;
int thrCnt=totThr;
struct socket_structure{
    int welcomeSocket;
    struct sockaddr_storage serverStorage;
    int *thrCnt;
};
 
void *communicate(void *arg1);
 
int main(int argc, char *argv[]){
    int portnr=7778;
    pthread_t thread1[totThr];
    int welcomeSocket;
    struct sockaddr_in serverAddr;
    struct sockaddr_storage serverStorage;
    socklen_t addr_size;
   
    welcomeSocket = socket(PF_INET, SOCK_STREAM, 0);
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(portnr);
    serverAddr.sin_addr.s_addr = inet_addr("0.0.0.0");
    memset(serverAddr.sin_zero, '\0', sizeof serverAddr.sin_zero);  
    bind(welcomeSocket, (struct sockaddr *) &serverAddr, sizeof(serverAddr));
    //Listen on the socket, with totThr max connection requests queued
    if(listen(welcomeSocket,totThr)==0)
      printf("Listening in port %i\n",portnr);
    else
      printf("Error\n");
    addr_size = sizeof serverStorage;
 
    //define vars
    int cnt=0;
    int start=1;
    //structure needed to start thread and pass multiple parameters
    struct socket_structure SS;
    SS.welcomeSocket=welcomeSocket;
    SS.serverStorage=serverStorage;
    SS.thrCnt=&thrCnt;
    int rc;
    //start initial threads
    if(start==1){
        int j;
        for(j=0;j<totThr;++j){
            rc=pthread_create( &thread1[j], NULL, communicate, (void*) &SS);
            if (rc == 0){
                //printf("thread[%i] created.. detaching ",j);
                    rc = pthread_detach(thread1[j]);
                if(rc!=0)
                    printf("failed..\n");
            }
        }
        start=0;
    }
    //main loop
    for(;;){
        //if threadcnt eq total threads, start idlethread function
        if(thrCnt==totThr){
            //call idleThread, we have enough threads
            rc=pthread_create( &idleThr, NULL, idleThread, (void*) NULL);
            pthread_join(idleThr, NULL);
        }
        //threads lower than totThr.. start new threads
        if(thrCnt<totThr){
            pthread_mutex_lock(&a_mutex); //lock mutex for thrCnt
            while(thrCnt < totThr){
                rc=pthread_create( &thread1[thrCnt], NULL, communicate, (void*) &SS);
                if (rc == 0){
                    //detach newly created thread
                        rc = pthread_detach(thread1[thrCnt]);
                    if(rc!=0){
                        printf("failed..\n");
                    }else{
                        ++thrCnt;
                    }
                }
            }
            pthread_mutex_unlock(&a_mutex);//unlock mutex
        }
        ++cnt;
    }
 
    pthread_exit(NULL);
}
 
void *communicate(void *arg1){
    char buffert[200]; //strcpy(buffert,"");
    char response[200];
    pthread_t id = pthread_self();
    struct socket_structure *SS;
    SS=(struct socket_structure*)arg1;
    socklen_t addr_size;addr_size= sizeof SS->serverStorage;
   
    //accept connection
    int newSocket = accept(SS->welcomeSocket, (struct sockaddr *) &SS->serverStorage, &addr_size);
 
    //mutex and decrement thread counter
    pthread_mutex_lock(&a_mutex);
    --*SS->thrCnt;
    pthread_mutex_unlock(&a_mutex);
 
    //receive auth
    if (!recv(newSocket, response, 30,0)) {
        close(newSocket);
        pthread_cancel(idleThr);
        int retcode=200;
        pthread_exit(&retcode);
        }
   
    if(bcmp("##,imei:868683024944313,A;",response,26) == 0){
        strcpy(buffert,"LOAD\n");
    }else{
        //failed auth, exit comm thread
        strcpy(buffert,"NOPE\n");
        close(newSocket);
        pthread_cancel(idleThr);
        int retcode=200;
        pthread_exit(&retcode);
    }
    memset(response,0,sizeof(response));
    send(newSocket,buffert,6,0);
 
    //server request Multiple position, every 1 minutes **,imei:359446018966098,C,1m   
    strcpy(buffert,"**,imei:3544446018966098,C,1m\n");
    send(newSocket,buffert,30,0);
    memset(buffert,0,sizeof(buffert));
 
    //receive responses again
    while(true) {
        if(recv(newSocket, response, 1024,0)){
            if(bcmp("imei",response,4)==0){
                //get N/E and speed
                struct threeInts NEsp=returnNEsp(response);
                printf("Tracker reported N: %.4f, E: %.4f speed: %.2f\n",NEsp.N,NEsp.E,NEsp.sp);
               
                //todo: write to tracking file
               
            }else{
                //heartbeat? todo: imei nr check or else disconnect client
                printf("Responding 'ON' to hearbeat request\n");
                strcpy(buffert,"ON\n");
                    send(newSocket,buffert,10,0);
            }
            memset(response,0,sizeof(response));
            memset(buffert,0,sizeof(buffert));
        }
        sleep(1);
        }
    strcpy(buffert,"ok bynow\n");
    send(newSocket,buffert,10,0);
 
    memset(response,0,sizeof(response));
   
    close(newSocket);
    //printf("\tExiting communication thread\n");
    pthread_cancel(idleThr);
    pthread_exit(NULL);
    return NULL;
}
I've not tested it, but I completed the server as I suggested. It routes messages between clients. I've not tested it, so it probably doesn't work (yet).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
#include <unistd.h>
#include <sys/socket.h>		// socket
#include <netinet/ip.h>		// sockaddr_in
#include <arpa/inet.h>		// inet_addr
#include <thread>
#include <mutex>
#include <deque>
#include <string>
#include <list>
#include <vector>
#include <string.h>			// memset

typedef std::string string;
typedef std::vector<string> strings;
typedef std::vector<char> chars;

struct Connection
{
	int s;
	sockaddr_in addr;

	Connection() : s(-1) { memset(&addr, 0, sizeof(addr)); }
};

struct Context
{
	Connection	conn;

	std::mutex in_mtx;
	std::deque<string> in_queue;
	std::thread* thread;

	Context(Connection& conn) : conn(conn), thread(nullptr) {}
};
typedef std::list<Context> Contexts;

Connection ServerSocket(string tcp_addr, int16_t port);
bool Accept(Connection& server, Connection& client);
void Shutdown(Connection& server);
void HandleConnect(Connection& conn);

// Globals
bool g_stop;
std::mutex g_ctx_mtx;
Contexts g_ctx;

int main()
{
	Connection server = ServerSocket("0.0.0.0", 2345);

	Connection client;
	while (Accept(server, client))
		HandleConnect(client);

	Shutdown(server);
}

Connection ServerSocket(string tcp_addr, int16_t port)
{
	Connection conn;
	conn.s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
	if (conn.s == -1)
		throw std::runtime_error(string("socket() failed. error=") + string(strerror(errno)));

	conn.addr.sin_family = AF_INET;
	conn.addr.sin_port = htons(port);
	conn.addr.sin_addr.s_addr = inet_addr(tcp_addr.c_str());
	int ret = bind(conn.s, (sockaddr*)&conn.addr, sizeof(conn.addr));
	if (ret == -1)
		throw std::runtime_error(string("bind() failed. error=") + string(strerror(errno)));

	ret = listen(conn.s, 50);
	if (ret == -1)
		throw std::runtime_error(string("listen() failed. error=") + string(strerror(errno)));

	return conn;
}

bool Accept(Connection& server, Connection& client)
{
	Connection local;
	socklen_t len = sizeof(local.addr);
	local.s = accept(server.s, (sockaddr*)&local.addr, &len);
	if (local.s == -1)
		return false;

	client = local;
	return true;
}

void Shutdown(Connection& server)
{
	g_stop = true;
	shutdown(server.s, SHUT_RD);
	sleep(4);	// allow system to settle

	for (Context& ctx : g_ctx)
	{
		close(ctx.conn.s);

		ctx.thread->join();
		delete ctx.thread; ctx.thread = nullptr;
	}

	close(server.s);
}


// Client Handling
void ClientHandler(Contexts::iterator me);

void HandleConnect(Connection& conn)
{
	std::lock_guard<std::mutex> lock(g_ctx_mtx);
	g_ctx.emplace_front(conn);
	g_ctx.begin()->thread = new std::thread(ClientHandler, g_ctx.begin());
}

// Maintain message flow between ALL connections
bool ReadMsg(Contexts::iterator me, chars& msg, int& msglen);
int PushMsg(Contexts::iterator me, chars& msg, int& msglen);
int FlushQueue(Contexts::iterator me);
void Disconnect(Contexts::iterator me);

void ClientHandler(Contexts::iterator me)
{
	chars msg(4096);
	int msglen = 0;

	while (!g_stop)
	{
		// listen for input from this client
		ReadMsg(me, msg, msglen);

		// push message onto other client queues
		if (msglen)
			PushMsg(me, msg, msglen);

		// flush queue to this client
		if (!me->in_queue.empty())
			FlushQueue(me);
	}
}

bool ReadMsg(Contexts::iterator me, chars& msg, int& msglen)
{
	fd_set read_fds;
	FD_ZERO(&read_fds);
	FD_SET(me->conn.s, &read_fds);

	struct timeval tv { 0, 100 * 1000 };	// 100 ms

	switch (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv))
	{
	case 1:
		if (FD_ISSET(me->conn.s, &read_fds))
		{
			std::lock_guard<std::mutex> lock(me->in_mtx);
			msglen = recv(me->conn.s, &msg.front(), msg.size(), 0);
			if (msglen == -1)
			{
				Disconnect(me);
				msglen = 0;
			}
		}
		break;

	case 0:		// timeout
	default:	// error?
		break;
	}
	
	return msglen > 0;
}

int PushMsg(Contexts::iterator me, chars& msg, int& msglen)
{
	int cnt = 0;

	for (Contexts::iterator p = g_ctx.begin(); p != g_ctx.end(); ++p)
		if (p != me)
		{
			std::lock_guard<std::mutex> lock(p->in_mtx);
			p->in_queue.emplace_front(&msg.front(), msglen);
			++cnt;
		}

	return cnt;
}

int FlushQueue(Contexts::iterator me)
{
	int cnt = 0;

	std::lock_guard<std::mutex> lock(me->in_mtx);
	while (!me->in_queue.empty())
	{
		send(me->conn.s, me->in_queue.back().c_str(), me->in_queue.back().size(), 0);
		me->in_queue.pop_back();
		++cnt;
	}

	return cnt;
}

void Disconnect(Contexts::iterator me)
{
	std::lock_guard<std::mutex> lock(g_ctx_mtx);
	g_ctx.erase(me);
}
Last edited on
That looks amazing, thanks for the example code! That is extremely helpful!

I am kind of surprised there are no comments though, is that not common practice when coding in C++?
There is nothing describing what it does and how, but I was kinda hoping it was self documenting.

The alternative is this style, which I hope you agree is kinda pointless.
1
2
3
4
5
6
7
8
9
10
// Disconnect a thread from the list of running threads, context list
//	me - threads iterator into context list
void Disconnect(Contexts::iterator me)
{
	// lock the context list
	std::lock_guard<std::mutex> lock(g_ctx_mtx);

	// remove me from the list
	g_ctx.erase(me);
}
Your code definitely reads well without comments. Just trying to get a feel for what to expect in a "professional" setting.. thanks for all the help and examples, that is great. I will be working on improving my code using your directions.
Topic archived. No new replies allowed.