Thread pool questions and advice.

Hey I made a simple thread pool and well "it works" but I was wondering if there were things I should do or add to make it better? I feel like I'm missing something really important. By the way the linkedlist to store the objects was made by me works the same with std::list but I just felt like using my link list :P.

Threading.h file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#pragma once

#ifndef THREADING_H
#define THREADING_H

#include <thread>
#include <functional>
#include <atomic>
#include <memory>
#include <list>
#include "linkedlist.h"

struct active_object
{
	/*Original creator credit goes to JLBorges on the cplusplus forums, edits/additions were made by me*/
	template < typename FN > active_object(FN fn) : thread([this, fn] { while (alive) fn(); }) {}
	active_object() {

	}
	~active_object() { alive = false; thread.detach(); }
	void endObject() {
		alive = false;
		thread.detach();
	}
	void toggleThread(bool state) {
		alive = state;
	}
	template< typename Function > void Assign(Function fn) {
		thread = std::thread([this, fn] { while (alive) fn(); });
	}
	active_object(const active_object&) = delete;
	active_object(active_object&&) = delete;
	active_object& operator= (active_object) = delete;

private:
	std::atomic<bool> alive{ true };
	std::thread thread;
};

class Threading {
private:
	const int coreAmount;
	struct Worker {
		std::atomic_bool isReady = true;
		std::thread Job;
		void endTask() {
			isReady = true;
			Job.join();
		}
	};
	Worker* employees;
	linkedlist<std::function<void()> > JobPostings;
	active_object Queue;
public:
	Threading();
	~Threading();
	void freeWorkers() {
		for (int worker = 0; worker < coreAmount; worker++) {
			if (!employees[worker].Job.joinable()) {
				employees[worker].isReady = true;
			}
		}
	}
	void assignWhenFree() {
		if (!JobPostings.is_empty()) {
			for (int worker = 0; worker < coreAmount; worker++) {
				if (employees[worker].isReady) {
					employees[worker].Job = std::thread(JobPostings.front());
					employees[worker].isReady = false;
					employees[worker].Job.join();
					JobPostings.pop_front();
				}
			}
		}
		else {
			Queue.toggleThread(false);
		}
		freeWorkers();
	}
	void endAll() {
		for (int worker = 0; worker < coreAmount; worker++) {
			employees[worker].endTask();
		}
	}
	void postJob(std::function<void()> Job) {
		JobPostings.addItemEnd(Job);
		Queue.toggleThread(true);
	}

};
#endif 


Threading.cpp file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include "Threading.h"



Threading::Threading() : coreAmount(std::thread::hardware_concurrency())
{
	employees = new Worker[coreAmount];
	Queue.Assign([&] { assignWhenFree(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); });
}


Threading::~Threading()
{
	endAll();
	delete employees;
}


main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <random>
#include "Threading.h"

using namespace std;
void func1() {
	cout << "Func1 finished";
}
void func2() {
	cout << "Func2 finished";
}
void func3() {
	cout << "Func3 finished";
}
void test(std::function<void()> use) {

}
int main() {
	cout << "You have " << std::thread::hardware_concurrency() << " threads";
	cout << "\nWill begin jobs in 1 second";	
	std::this_thread::sleep_for(std::chrono::seconds(1));
	uniform_int_distribution<int> func_call(0, 2);
	mt19937 engine(std::chrono::system_clock::now().time_since_epoch().count());
	Threading pool;
	
	while (true) {
		int random = func_call(engine);
		switch (random) {
		case 0:
			pool.postJob(std::bind(&func1));
			break;
		case 1:
			pool.postJob(std::bind(&func2));
			break;
		case 2:
			pool.postJob(std::bind(&func3));
			break;
		}
		cout << '\n';
	}
	
	cin.ignore();
	cin.get();
}
I was wondering if there were things I should do or add to make it better?

This isn't actually a "thread pool", since it does not use pooled threads.

As far as I can see, here each task is executed in a new (not pooled) thread (line 68) and no new tasks are launched until that task completes and its thread shuts down at line 70.

Also, a major red flag is detach for that useless thread from active_object: the thread can be in that loop around line 68 when the main thread executes delete employees;, In fact, even if it were joined in ~active_object, it would still be live after delete employees;, since the Queue member destructor doesn't run until after that line.
@Cubbi
Really sorry for the late response...

As far as I can see, here each task is executed in a new (not pooled) thread (line 68) and no new tasks are launched until that task completes and its thread shuts down at line 70.


Yes, I'm just storing the tasks for the threads to proceed later. Which I guess your right isn't a threadpool. In that case what should this "thing" be.

Oh and also I changed the thread.join @ line 70 to thread.detach since from my understanding detach allows it to handle its own process/execution without it having to wait so that I could allow more threads to come in. Although thinking about that I'm not sure if that is a good thing mainly since it could lead to it not being thread safe.

Edited h file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#pragma once

#ifndef THREADING_H
#define THREADING_H

#include <thread>
#include <functional>
#include <atomic>
#include <memory>
#include <list>
#include "linkedlist.h"

struct active_object
{
	/*Original creator credit goes to JLBorges on the cplusplus forums, edits/additions were made by me*/
	template < typename FN > active_object(FN fn) : thread([this, fn] { while (alive) fn(); }) {}
	active_object() {

	}
	~active_object() { endObject(); }
	void endObject() {
		alive = false;
		if (thread.joinable()) {
			thread.join();
		}
	}
	void toggleThread(bool state) {
		alive = state;
	}
	template< typename Function > void Assign(Function fn) {
		thread = std::thread([this, fn] { while (alive) fn(); });
	}
	active_object(const active_object&) = delete;
	active_object(active_object&&) = delete;
	active_object& operator= (active_object) = delete;

private:
	std::atomic<bool> alive{ true };
	std::thread thread;
};

class Threading {
private:
	const int coreAmount;
	struct Worker {
		std::atomic_bool isReady = true;
		std::thread Job;
		void endTask() {
			isReady = true;
			if (Job.joinable()) {
				Job.join();
			}
		}
	};
	Worker* employees;
	linkedlist<std::function<void()> > JobPostings;
	active_object Queue;
public:
	Threading();
	~Threading();
	void freeWorkers() {
		for (int worker = 0; worker < coreAmount; worker++) {
			if (!employees[worker].Job.joinable()) {
				employees[worker].isReady = true;
			}
		}
	}
	void assignWhenFree() {
		for (int worker = 0; worker < coreAmount; worker++) {
			if (!JobPostings.is_empty()) {
				if (employees[worker].isReady) {
					employees[worker].Job = std::thread(JobPostings.front());
					employees[worker].isReady = false;
					employees[worker].Job.join();
					JobPostings.pop_front();
				}
			}
			else {
				Queue.toggleThread(false);
				return;
			}
		}
		
		freeWorkers();
	}
	void endAll() {
		for (int worker = 0; worker < coreAmount; worker++) {
			employees[worker].endTask();
		}
	}
	void postJob(std::function<void()> Job) {
		JobPostings.addItemEnd(Job);
		Queue.toggleThread(true);
	}

};
#endif 


Also, a major red flag is detach for that useless thread from active_object: the thread can be in that loop around line 68 when the main thread executes delete employees;, In fact, even if it were joined in ~active_object, it would still be live after delete employees;, since the Queue member destructor doesn't run until after that line.


Yeah I fixed it by ending the queue first.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include "Threading.h"



Threading::Threading() : coreAmount(std::thread::hardware_concurrency())
{
	employees = new Worker[coreAmount];
	Queue.Assign([&] { assignWhenFree(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); });
}


Threading::~Threading()
{
	Queue.endObject();
	endAll();
	delete employees;
}
In that case what should this "thing" be.

given that it runs one task at a time (Job.join waits for the thread you spawned to run JobPostings.front() to terminate):
1
2
3
					employees[worker].Job = std::thread(JobPostings.front());
					employees[worker].isReady = false;
					employees[worker].Job.join();

It's somewhat like a turnstille with unbounded job queue.. except turnstiles don't spawn threads
Last edited on

It's somewhat like a turnstile with unbounded job queue.. except turnstiles don't spawn threads

At least not yet lol. Seriously though is it a good thing that it's like a turnstile. Maybe I should probably fix this up and work on an "actual" thread-pool later hmm.
Topic archived. No new replies allowed.