C++ Thread pool class

Hi all
i am trying to create a thread pool class using c++
i already create one thread pool using boost asio and boost thread
however i am trying by following approch and stuck at some point.
can u suggest any solution.
thanks in advance.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

#include <boost\asio.hpp>
#include <iostream>
#include <boost\thread\thread.hpp>
#include <boost\bind.hpp>
#include <boost\thread\mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class ThreadPool
{
	static int count;
	int NoOfThread;
	thread_group grp;
	mutex mutex_;
	asio::io_service io_service;
	int counter;
	stack<thread*> thStk ;

public:
	ThreadPool(int num)
	{	
		NoOfThread = num;
		counter = 0;
		mutex::scoped_lock lock(mutex_);

		if(count == 0)
			count++;
		else
			return;
		
		for(int i=0 ; i<num ; ++i)
		{
			thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
		}
	}
	~ThreadPool()
	{
		io_service.stop();
		grp.join_all();
	}

	thread* getThread()
	{
		if(counter > NoOfThread)
		{
			cout<<"run out of threads \n";
			return NULL;
		}
		
		counter++;
		thread* ptr = thStk.top();
		thStk.pop();
		return ptr;
	}
};
int ThreadPool::count = 0;


struct callable
{
    void operator()()
	{
		cout<<"some task for thread \n";
	}
};

int main( int argc, char * argv[] )
{
	
	callable x;
	ThreadPool pool(10);
	thread* p = pool.getThread();
	cout<<p->get_id();

	//how i can assign some function to thread pointer ?
	//how i can return thread pointer after work done so i can add 
//it back to stack?


	return 0;
}
	
I thought the idea of a thread pool was to create a set of threads, up to some maximum number, and have them process tasks of a task queue.

You then push work items onto the queue and the they're processed by the threads in the set concurrently, when each thread completes its task, it fetches the next work item off the queue until the queue is empty.

Your code doesn't appear to be doing anything like that. How does your thread pool work in principle?
thanks for taking interest.

there is no predefined queue for task.
my steps are
1> runtime i will get task.
2>i have to keep threads ready in pool.(not to create one on time) asign a thread to do task.
3>return a thread to pool.
4>pool of fix size, if more request comes , sorry (some limits up to max no of concerent threads only may be something like SQL connction pool).

well i achive it using boost::asio & using io_stream::post functionality
but i want to create general class for same.



It sounds like we're in agreement on what needs to happen, but I don't see anything on the public interface of your thread pool class that supports that.
thanks kbw,
ok here is a another code which actiually do my job.
i make it simple by removing few things.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include<iostream>
#include<boost\thread\thread.hpp>
#include <boost\bind.hpp>
#include <boost\asio.hpp>

using namespace std;
using namespace boost;


//interface which has to follow
class process
{
public:
	virtual void doProcess()=0;
	virtual ~process()
	{
	}
};

//specific implementation 1
class processType1:virtual public process
{
	int x;
public:
	processType1(int i)
	{
		x = i;
	}
	virtual void doProcess()
	{
		cout<<"some process "<<x<<"\n";
		Sleep(1000);
	}
	virtual ~processType1()
	{

	}
	
};

std::shared_ptr<process> getObj(int i) //just for simplicity
{
	return std::shared_ptr<process>(new processType1(i));
}

void threadfun(std::shared_ptr<process> obj) // thread function to bind
{
	 obj->doProcess();
}

int main()
{
	asio::io_service io_service;
	asio::io_service::work work(io_service);
	
	thread_group threads; // thread pool of 10 threads
  
	for (std::size_t i = 0; i < 10; ++i)
		threads.create_thread(boost::bind(&asio::io_service::run, &io_service)); 

	for(int i=0 ; i<100 ; ++i) //this is infinite loop i make it simple
	{
		std::shared_ptr<process> obj = getObj(i); //i will be getting object here 

		io_service.post(boost::bind(threadfun, obj));//assign task to one of thread in pool
	}

	
	io_service.stop();
    threads.join_all();
	return 0;
}


now my intention is to remove interface, just a standalone class which will give threads ready to assig any task at run time. the task/process class may go on changing.
The implementation should never make it's way into the user's code. Interface is everything. It should look easy to use. I would expect:

1. Some kind of Task object with some kind of pure virtual Execute method that you derive from to make up your work units.

2. Some kind of ThreadPool object that you initialise with the number of threads, and minimally, an AddTask method that takes a dynamically allocated Task*.

3. Some way to pickup completed Tasks.

The user code creates a ThreadPool initialised with the number of threads, then adds Task objects to the ThreadPool instance and picks up the results somewhere.

My approach would be to think about how the whole thing would hang together and write it down in UML, then build the first draft, and improve as necessary in subsequent iterations.
well my 2'nd code meets above requirements.
the only change that need is to is to wrap up some functionality in main in some class.
however my 1'st code will be still having problems

1> how i can assign some function to thread pointer ?
2> how i can return thread pointer after work done so i can add it back to stack?

i go through boost thread i don't get anything which can add function to existing thread.
and there is no fnctionality in thread pool class to post task to one of thread in pool.

so only way remain is to put wrapper over boost::asio and make it close to above requirement.

after considering above points. i write new code.

now only problem is where should i decrese thread counter after my thread is done with its work ?

i am not getting any place to put counter++.
can some one help ?


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <boost\asio.hpp>
#include <iostream>
#include <boost\thread\thread.hpp>
#include <boost\bind.hpp>
#include <boost\thread\mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class process
{
	
public:
	virtual void run()=0;
	virtual ~process()=0
	{
	}
};

class processType1:virtual public process
{
public:
	int id;
	processType1(int i)
	{
		id = i;

	}
	virtual void run()
	{
		cout<<"\n your task goes here"<<id<<" \n";
		
	}
	virtual ~processType1()
	{
	}
};

class ThreadPool
{
	static int count;
	
	int counter;
	int NoOfThread;
	thread_group grp;
	mutex mutex_;
	asio::io_service io_service;
	stack<thread*> thStk ;

public:
	ThreadPool(int num)
	{	
		mutex::scoped_lock lock(mutex_);

		asio::io_service::work work(io_service);

		NoOfThread = num;
		counter = 0;
		

		if(count == 0)
			count++;
		else
			return;
		
		for(int i=0 ; i<num ; ++i)
		{
			thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
		}
	}

	void static threadfun(std::shared_ptr<process> obj)
	{
		obj->run();
	}
	
	void runTask(std::shared_ptr<process>  obj)
	{
		if(counter++ < NoOfThread)
		{
			io_service.post(boost::bind(threadfun, obj));
		}
		else
		{
			cout<<"\n run out of threads \n";
		}
	}

	~ThreadPool()
	{
		io_service.stop();
		grp.join_all();
	}
};
int ThreadPool::count = 0;

int main( int argc, char * argv[] )
{
	
	ThreadPool pool(10);
	
	for(int i=0 ; i<20 ; ++i)
	{
		std::shared_ptr<process> obj(new processType1(i));
		pool.runTask(obj);
		
	}

	getchar();
	
	return 0;
}
That looks much better. I can see where you increment counter, but you don't appear to decrement it.
yes that the only problem ,where to decrement it ?
Shouldn't a thread pool be thread safe?

For instance std::atomic<int> counter ;, perhaps boost::lockfree::stack<thread*> thStk ; and so on.
well let me make it work first, later i will improve it.
yes that the only problem ,where to decrement it?
You need somewhat of knowing when the task is done.
wooo.. remove errors in my code.
well here is final code. works way i want;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

#include <boost\asio.hpp>
#include <iostream>
#include <boost\thread\thread.hpp>
#include <boost\bind.hpp>
#include <boost\thread\mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class process
{
public:
	virtual void run()=0;
	virtual ~process()=0
	{
	}
};

class processType1:virtual public process
{
	int id;
public:
	processType1(int i)
	{
		id = i;
	}
	virtual void run()
	{
		cout<<"long task task id "<<id<<"\n";
		Sleep(10000);
	}
	virtual ~processType1()
	{

	}
};

class ThreadPool
{
	static int Scount;
	int available;
	thread_group grp;
	mutex mutex_;
	asio::io_service io_service;
	asio::io_service::work work_;

public:
	ThreadPool(int num):work_(io_service)
	{
		mutex::scoped_lock lock(mutex_);
		
		available = num;
	
		if(Scount == 0)
			Scount++;
		else
			return;
		
		for(int i=0 ; i<num ; ++i)
		{
			grp.create_thread(boost::bind(&asio::io_service::run, &io_service));
		}
	}
	void runTask(std::shared_ptr<process>  obj)
	{
		if(available == 0)
		{
			cout<<"run out of thread wait \n";
			return;
		}
		--available;
		try
		{
			io_service.post(boost::bind(&ThreadPool::threadfun, this, obj));
		}
		catch(...)
		{
			cout<<"some error \n";
		}
	}
	
	~ThreadPool()
	{
		io_service.stop();
		grp.join_all();
	}

private:
	void threadfun(std::shared_ptr<process> obj)
	{
		obj->run();
		cout<<"task over \n";
		++available;
	}
};
int ThreadPool::Scount = 0;


int main( int argc, char * argv[] )
{
	ThreadPool pool(5);
	for(int i=0 ;  ; ++i)
	{
		std::shared_ptr<process> obj(new processType1(i));
		pool.runTask(obj);
		Sleep(500);
	}
	return 0;
}
	
What's the point of locking a mutex (lock_guard, btw) in the constructor when the mutex is your class's own non-static member? It's impossible for another thread to try to lock the same mutex.
Also, why does creating another threadpool (nonzero Scount) result in a pool that pretends to have a bunch of things available, but doesn't actually own any threads?

Also, it's either boost::this_thread::sleep(), you you need to #include something that provides Sleep()
oh, and there's a pure syntax error: ~process(); cannot have a body inside the class definition, it has to be outside, as "process::~process() {};"
Last edited on
thanks for taking interest.


i think ,i am trying to create one object only , i dont want another object, as of now i don't put any error message.
Scount will be 0 only.
i don't need to include anything for Sleep , i use visual studio 2010 express.

~process(); cannot have a body inside the class definition ?

well above code works and don't show any compiler \ syntax error

can u specify any specific compiler which shows error for same.
Last edited on
Topic archived. No new replies allowed.