boost thread pool execution

Pages: 12
I'm trying to use a Boost thread group to execute various sql statements on different relational databases.

The code below does not perform the update as expected, yet when the function is run outside the thread pool, it executes as expected.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <SQLAPI/include/SQLAPI.h>

class thread_pool {
  private:
    boost::asio::io_service ios;
    boost::asio::io_service::work work_ios;
    boost::thread_group thread_grp;
    unsigned short threads_free;
    boost::mutex mx;
  public:

    // Constructor.
    thread_pool( int pool_size = 0 ) : work_ios( ios ), threads_free( pool_size )  {
      if(pool_size>0) 
        pool_size = boost::thread::hardware_concurrency()*2; // default to double number of cores
      for ( int i = 0; i < pool_size; ++i )
        thread_grp.create_thread( boost::bind( &boost::asio::io_service::run, &ios ) );
    }

    ~thread_pool() {
      ios.stop(); // Ensure all threads in ios::run() are stopped
      try { thread_grp.join_all(); }
      catch ( const std::exception& ) {}
    }

    // if thread is free, add a job
    template < typename Job >
    void run_job( Job job ) {
      boost::unique_lock< boost::mutex > lock( mx );
      if ( 0 == threads_free ) return; // exit if no available threads
      --threads_free; // Decrement thread count
      ios.dispatch(boost::bind( &thread_pool::wrapper, this, boost::function< void() >( job ) ));
      // ios.post( boost::bind( &thread_pool::wrapper, this, boost::function< void() >( job ) ) );
    }

  private:
    // Called from run_job
    //  Wrap the job and ensure thread free count is incremented safely
    void wrapper( boost::function< void() > job ) {
      try { job(); } // executes the job
      catch ( const std::exception& ) {}

      // increment available threads, once job finished
      boost::unique_lock< boost::mutex > lock( mx );
      ++threads_free;
    }
};

long exec_sql(const std::string & sql) {
    SAConnection con; // connection object
    long ret{0};
    try {
      con.Connect("localhost,5432@ft_node", "bluefrog", "bluefrog", SA_PostgreSQL_Client);
      con.setAutoCommit(SA_AutoCommitOff);
      con.setIsolationLevel(SA_Serializable);
      SACommand cmd1(&con, sql.c_str());
      cmd1.Execute();

      ret = cmd1.RowsAffected();
      con.Commit();
    }
    catch(SAException &x) {
      std::cout << (const char*)x.ErrText() << "\n";
    }

    con.Disconnect();
    return ret;
}

int main () {

  // long ret = exec_sql("update test1 set y = 'Yellow' where x in(2,3,4)");  
  // std::cout << "return " << ret << "\n";
    thread_pool tp{4};
    tp.run_job( boost::bind( exec_sql, std::string("update test1 set y = 'Magenta' where x in(2,3,4)")  ) );

  return 0;
};

Can anybody suggest what the problem might be ?
The problem might be that the execution of the database command is not thread safe.

Also exec_sql(...) does actually always connect to the same database.
I've simplified the code and removed the function that connected to the database .
Instead, an unordered map is updated within each of four callable objects. Each callable object uses a unique key, 1 to 4, to update the map, so there should be no conflicts.

I am performing an explicit shutdown by calling "stop_service", which performs a join_all and then explicitly stops the io_service object (ios).

I expected the map to have four string entries, but the output varies from one execution to another.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <boost/asio.hpp>
#include <string>
#include <unordered_map>
#include <boost/thread.hpp>

class thread_pool {
private:
  boost::asio::io_service ios;
  boost::asio::io_service::work work_ios;
  boost::thread_group thread_grp;
  unsigned short threads_free;
  boost::mutex mx;
public:

   thread_pool( int pool_size = 0 ) : work_ios( ios ), threads_free( pool_size )  {
      if(pool_size>0) 
        pool_size = boost::thread::hardware_concurrency()*2; // default to double number of cores
      for ( int i = 0; i < pool_size; ++i )
        thread_grp.create_thread( boost::bind( &boost::asio::io_service::run, &ios ) );
    }

  ~thread_pool() {
    try  { thread_grp.join_all(); }
    catch ( const std::exception& ) {}
    ios.stop();
  }

  void stop_service() {
    ios.stop();
    try  { thread_grp.join_all(); }
    catch ( const std::exception& ) {}
  }

  template < typename Job >
  void run_job( Job job ) {
    boost::unique_lock< boost::mutex > lock( mx );

    if ( 0 == threads_free ) return;
    --threads_free;

    ios.post( boost::bind( &thread_pool::wrap_job, this, boost::function< void() >(job) ));
  }

private:
  void wrap_job( boost::function< void() > job ) {
    try { job(); }
    catch ( const std::exception& ) {}

    boost::unique_lock< boost::mutex > lock( mx );
    ++threads_free;
  }
};

std::unordered_map<int, std::string> ht;

void work() { 
   std::cout << "work()" << std::endl;
   ht[1] = "work done";
};

struct worker {
  void operator()() { 
    std::cout << "struct work" << std::endl;
    ht[2] = "worker done";
  };
  
};

void more_work( int ) {
   std::cout << "more_work( int )" << std::endl;
   ht[3] = "more_work done";
};

void do_try(int i) {
  try { 
    std::cout << "In try "<< "\n"; 
    ht[4] = "do_try done";
  }
  catch (std::exception &x) { }
}

int main () {
  
  thread_pool tp{4};
  
  tp.run_job( work ); 
  tp.run_job( worker() );
  tp.run_job( boost::bind( more_work, 5 ) );
  tp.run_job( boost::bind( do_try, 6 ) );

  tp.stop_service();
  
  for ( auto it = ht.begin(); it != ht.end(); ++it )
    std::cout << " " << it->first << ":" << it->second << "\n";
  
  std::cout << std::endl;
  return 0;  
};


output:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ ./thread_pool_test2
more_work( int )
work()
In try 
struct work
 1:work done
 2:worker done
 4:do_try done
 3:more_work done

$ ./thread_pool_test2
more_work( int )struct work

work()
 1:work done
 2:worker done
 3:more_work done

So for the first execution the unordered map shows all 4 entries, but for the second not.
Can anybody explain the behaviour, given the fact that a join_all is performed.
I would think that those threads are somehow canceled before they actually run your code.

yes, they are most certainly cancelled before they complete.
A thread_grp.join_all should prevent that, but if I modify stop_service from this:
1
2
3
4
5
  void stop_service() {
    ios.stop();
    try  { thread_grp.join_all(); }
    catch ( const std::exception& ) {}
  }

to this
1
2
3
4
5
  void stop_service() {
    try  { thread_grp.join_all(); }
    catch ( const std::exception& ) {}
    ios.stop();
  }


then join_all hangs .
Why would does join_all hang though ?
Why would does join_all hang though ?

because you gave io_service a work item, in the constructor, and it has not yet been destroyed. run() does not exit as long as there is work.

(also, what's up with unprotected concurrent modification of ht?)
Last edited on
Why would does join_all hang though ?
Because ios.stop(); is required to stop io_service::run. Otherwise the service simply keeps running.

What you can do is introducing another variable like threads_running which is incremented at the beginning of wrap_job(). In stop_service() you sleep for a while (say 100ms) in a loop until the required amount of threads are running. You can make threads_running atomic but it doesn't need a lock.

thanks, I've included a
boost::this_thread::sleep_for(boost::chrono::seconds{5});
in stop_service and the hash table entries appear as expected on each execution.
Whilst including the delay is not full proof, at least it proves that the updating can occur.

There is currently no member function in Boost::thread_group that indicates if threads are currently running or not.

I plan to use the thread pool class to insert and update 2 different databases on separate servers.


Hi coder777,

Am attempting to do what you suggested, at least that is what I hope.
So I've therefore introduced an atomic counter, for running threads, as follows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class thread_pool {
private:
  boost::asio::io_service ios;
  boost::asio::io_service::work work_ios;
  boost::thread_group thread_grp;
  unsigned short threads_free;
  boost::mutex mx;
  static std::atomic<int> t = std::atomic<int>(0);
public:

   thread_pool( int pool_size = 0 ) : work_ios( ios ), threads_free( pool_size ) {
      if(pool_size>0) 
        pool_size = boost::thread::hardware_concurrency()*2;
      for ( int i = 0; i < pool_size; ++i )
        thread_grp.create_thread( boost::bind( &boost::asio::io_service::run, &ios ) );
      
    }


wrap_job has been modified to include an increment and a decrement
1
2
3
4
5
6
7
8
9
  void wrap_job( boost::function< void() > job ) {
    try { t++; 
           job(); 
           t--; }
    catch ( const std::exception& ) {}

    boost::unique_lock< boost::mutex > lock( mx );
    ++threads_free;
  }


so the stop_service now has an arbitrary delay, based on number of threads.
So is that the kind of thing you had in mind regarding "threads_running"

My only problem now is that I cannot initialise a static atomic .
use of deleted function 'std::atomic<int>::atomic(const...

Any suggestions where and how I can initialise the atomic ?
If the counter is not atomic, will that not compromise thread safety ?
Last edited on
FWIW this edit to the original example works for me:

1
2
3
4
  void stop_service() {
    work_ios.reset();       // allow the worker threads to exit when done
    thread_grp.join_all(); // wait for the worker threads to finish work
  }

where work_ios was changed to std::unique_ptr<boost::asio::io_service::work> rather than a direct member, and of course lock_guards added around all the ht[x] ='s.
thanks for the tip on resetting work object. Really helpful.

With regard to the hash table updates:

The documentation states:
Different elements in the same container can be modified concurrently by different threads, except for the elements of std::vector<bool> (for example, a vector of std::future objects can be receiving values from multiple threads).


So, for the above case, if the key is guaranteed to be unique, then hash table is ideal candidate as a container and thread safe (based on key usage) without using a mutex.

bluefrog wrote:
if the key is guaranteed to be unique, then hash table is ideal candidate as a container and thread safe (based on key usage) without using a mutex.

hash table is not an array: unordered_map::operator[] can rebuild the table and invalidate every iterator. Actually even without rehash, concurrent insert would be a data race on a collision as you'd be modifying the same linked list.
In other words, you're not modifying elements, you are inserting them.
Last edited on

If the key was unique, would you in fact end up with collisions ?
If not, presumably there would be no linked list to accommodate contentious "buckets", only an associative key and bucket.

In addition, reserve ensures that the buckets are pre-allocated. Also, buckets are moved around, not copied if a resize does ever occur.
Lastly, I always use the key for access, so no iteration is ever performed over a hash table.

I may change my mind however, I'll be monitoring collisions based on bucket_count and bucket_size.
Last edited on
bluefrog wrote:
If the key was unique, would you in fact end up with collisions ?

Yes. Guaranteed if the number of buckets is less than the number of distinct keys, possible, with decreasing probability, as the number of buckets grows.

Also, buckets are moved around, not copied if a resize does ever occur

Nodes are what's moved. Attempting to access the map concurrently with that is a data race - there's no telling what your operator[] or .get or iterator dereference would possibly see! You could modify elements concurrent with rehash if you preserved pointers/references (not iterators) to those elements before the rehash began, though.

I always use the key for access, so no iteration is ever performed over a hash table.

Using a key for access is a data race against unordered_map insert. Once you built the map (and synchronized that last insert with the first access by key), then sure, read and modify your existing elements, by key or by iteration or however you want - the cppreference quote would apply then.

I only listed the first few reasons why this is invalid that came to mind. Proper approach is to start with code that does not violate container thread safety guarantees and then see if there are optimization opportunities with alternatives.
Last edited on

Ok, I surrender, I've introduced a shared_mutex & and shared_lock.

So is this the correct way to protect the hash table update ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include <boost/asio.hpp>
#include <string>
#include <boost/date_time.hpp>
#include <unordered_map>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <memory>


class thread_pool {
private:
  boost::asio::io_service ios;
  std::unique_ptr<boost::asio::io_service::work> work_ios;
  boost::thread_group thread_grp;
  int threads_free;
  boost::mutex mx;
public:

   thread_pool( int pool_size = 0 ) : work_ios{std::make_unique<boost::asio::io_service::work>(ios)}, threads_free{pool_size} {
   //thread_pool( int pool_size = 0 ) : work_ios{ios}, threads_free{pool_size} {
      if(pool_size>0) 
        pool_size = boost::thread::hardware_concurrency()*2;
      for ( int i = 0; i < pool_size; ++i )
        thread_grp.create_thread( boost::bind( &boost::asio::io_service::run, &ios ) );
      
    }

  ~thread_pool() {
    ios.stop();
    try  { thread_grp.join_all(); }
    catch ( const std::exception& ) {}
  }

  void stop_service() {
    // boost::this_thread::sleep_for(boost::chrono::seconds{3});
    work_ios.reset(); // waits for all threads to complete
    thread_grp.join_all();
  }

  template < typename Job >
  void run_job( Job job ) {
    boost::unique_lock< boost::mutex > lock( mx );

    if ( 0 == threads_free ) return;
    --threads_free;

    ios.post( boost::bind( &thread_pool::wrap_job, this, boost::function< void() >(job) ));
    // ios.dispatch( boost::bind( &thread_pool::wrap_job, this, boost::function< void() >(job) ));
  }

private:
  void wrap_job( boost::function< void() > job ) {
    try { job(); }
    catch ( const std::exception& ) {}

    boost::unique_lock< boost::mutex > lock( mx );
    ++threads_free;
  }
};


std::unordered_map<int, std::string> ht;
boost::shared_mutex mx;

void func1() { 
  boost::shared_lock<boost::shared_mutex> lk(mx);
  ht[1] = "func1 done";
};

struct func2 {
  void operator()() { 
    boost::shared_lock<boost::shared_mutex> lk(mx);
    ht[2] = "func2 done"; 
  };
};

void func3( int ) { 
  boost::shared_lock<boost::shared_mutex> lk(mx);
  ht[3] = "func3 done"; 
};

void func4(int i) {
  try {     
    boost::shared_lock<boost::shared_mutex> lk(mx);
    ht[4] = "func4 done"; 
  }
  catch (std::exception &e) { }
}

int main () {
  
  thread_pool tp{4};
  
  tp.run_job( func1 ); 
  tp.run_job( func2() );
  tp.run_job( boost::bind( func3, -21 ) );
  tp.run_job( boost::bind( func4, 99 ) );

  tp.stop_service();
  
  for ( auto it = ht.begin(); it != ht.end(); ++it )
    std::cout << " " << it->first << ":" << it->second << "\n";
  
  std::cout << std::endl;
  return 0;  
};


unfortunately the execution occasionally results in undefined behaviour.
So mostly it executes as expected, but occasionally an undefined behaviour type error occurs, for example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
$ ./db_service 
 4:func4 done
 3:func3 done
 2:func2 done
 1:func1 done

$ ./db_service 
 4:func4 done
 2:func2 done
 1:func1 done
 3:func3 done

$ ./db_service 
*** Error in `./db_service': double free or corruption (fasttop): 0x00007fe7c4000900 ***
======= Backtrace: =========
/lib/x86_64-linux-gnu/libc.so.6(+0x7908b)[0x7fe7ce77c08b]
/lib/x86_64-linux-gnu/libc.so.6(+0x82c3a)[0x7fe7ce785c3a]
/lib/x86_64-linux-gnu/libc.so.6(cfree+0x4c)[0x7fe7ce789d2c]
./db_service(+0x18b3c)[0x562be3a81b3c]
./db_service(+0x17fa6)[0x562be3a80fa6]
./db_service(+0x171f9)[0x562be3a801f9]
..
..
..
/usr/lib/x86_64-linux-gnu/libboost_thread.so.1.62.0(+0x11576)[0x7fe7cf27e576]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x76da)[0x7fe7ce2e46da]
/lib/x86_64-linux-gnu/libc.so.6(clone+0x5f)[0x7fe7ce80bd7f]
======= Memory map: ========
562be3a69000-562be3a91000 r-xp 00000000 08:02 13255934                   db_service
562be3c90000-562be3c92000 r--p 00027000 08:02 13255934                   db_service
562be3c92000-562be3c93000 rw-p 00029000 08:02 13255934                   db_service
562be5b55000-562be5b87000 rw-p 00000000 00:00 0                          [heap]
7fe7ac000000-7fe7ac021000 rw-p 00000000 00:00 0 
..
..
7fe7cd7d4000-7fe7cdfd4000 rw-p 00000000 00:00 0 
7fe7cdfd4000-7fe7ce0dc000 r-xp 00000000 08:02 29622451                   /lib/x86_64-linux-gnu/libm-2.24.so
..
..
7fe7ce2dd000-7fe7ce2f5000 r-xp 00000000 08:02 29622462                   /lib/x86_64-linux-gnu/libpthread-2.24.so
..
..
7fe7ce4f6000-7fe7ce4f7000 rw-p 00019000 08:02 29622462                   /lib/x86_64-linux-gnu/libpthread-2.24.so
7fe7ce4f7000-7fe7ce4fb000 rw-p 00000000 00:00 0 
7fe7ce4fb000-7fe7ce502000 r-xp 00000000 08:02 29622464                   /lib/x86_64-linux-gnu/librt-2.24.so
..
..
..
7fe7ceac6000-7fe7ceaca000 rw-p 00000000 00:00 0 
7fe7ceaca000-7fe7ceae0000 r-xp 00000000 08:02 29626872                   /lib/x86_64-linux-gnu/libgcc_s.so.1
..
..
7fe7cece1000-7fe7cee5a000 r-xp 00000000 08:02 17311155                   /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22
7fe7cee5a000-7fe7cf059000 ---p 00179000 08:02 17311155                   /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22
7fe7cf059000-7fe7cf063000 r--p 00178000 08:02 17311155                   /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22
7fe7cf063000-7fe7cf065000 rw-p 00182000 08:02 17311155                   /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22
7fe7cf065000-7fe7cf069000 rw-p 00000000 00:00 0 
7fe7cf069000-7fe7cf06c000 r-xp 00000000 08:02 17310059                   /usr/lib/x86_64-linux-gnu/libboost_system.so.1.62.0
..
..
7fe7cf493000-7fe7cf494000 rw-p 00026000 08:02 17310060                   /usr/lib/x86_64-linux-gnu/libboost_thread.so.1.62.0
7fe7cf494000-7fe7cf4ba000 r-xp 00000000 08:02 29622443                   /lib/x86_64-linux-gnu/ld-2.24.so
7fe7cf690000-7fe7cf696000 rw-p 00000000 00:00 0 
7fe7cf6b5000-7fe7cf6b9000 rw-p 00000000 00:00 0 
7fe7cf6b9000-7fe7cf6ba000 r--p 00025000 08:02 29622443                   /lib/x86_64-linux-gnu/ld-2.24.so
7fe7cf6ba000-7fe7cf6bb000 rw-p 00026000 08:02 29622443                   /lib/x86_64-linux-gnu/ld-2.24.so
7fe7cf6bb000-7fe7cf6bc000 rw-p 00000000 00:00 0 
7ffcc45af000-7ffcc45d0000 rw-p 00000000 00:00 0                          [stack]
7ffcc45d5000-7ffcc45d7000 r--p 00000000 00:00 0                          [vvar]
7ffcc45d7000-7ffcc45d9000 r-xp 00000000 00:00 0                          [vdso]
ffffffffff600000-ffffffffff601000 r-xp 00000000 00:00 0                  [vsyscall]
Aborted 
Last edited on
Cubbi wrote:
FWIW this edit to the original example works for me:
Well, destructing the service object will stop the processing. So isn't it actually the same as calling ios.stop(); directly?

@bluefrog
wrap_job has been modified to include an increment and a decrement
No decrement, that would defy the purpose. So just increment and after thread_grp.join_all(); set it to 0.
And don't make it static.

My only problem now is that I cannot initialise a static atomic .
Set it to 0 in the thread_pool constructor.

So is this the correct way to protect the hash table update ?
Why don't you just use an array. Since any thread use another index there are no race conditions. unordered_map is certainly not thread safe.

Why do you use io_service at all? Seems like you are doing nothing with it.

I've removed the increment/decrement from wrap_job. My previous post did not have it included. I've also removed the atomic.

You mentioned "Why do you use io_service at all? Seems like you are doing nothing with it."

A thread is launched with:
thread_grp.create_thread( boost::bind( &boost::asio::io_service::run, &ios ) );
A thread generally runs till it completes or is terminated.

To allow a thread to run multiple callable objects while it is always running, it will run a function that is read from a queue, hence io_service::run(). io_serivce::run() however stops running when there is no more work. To prevent ios from terminating io_service::work is used. The Boost documentation can explain this in more detail.

Callable objects are placed onto the queue either by dispatch or post.
Refer to this note, https://stackoverflow.com/questions/2326588/boost-asio-io-service-dispatch-vs-post, for the difference between the two.
So the following ios.post( boost::bind( &thread_pool::wrap_job, this, boost::function< void() >(job) )); places the object onto the queue, and the method &thread_pool::wrap_job runs it.

The reason I am using an unordered_map is more is because it is an associative container, an array would not be suitable for the application in mind.
Last edited on
So I have a faint idea why you think you need io_service. Actually you don't:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <string>
#include <thread>
#include <memory>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <iostream>



std::unordered_map<int, std::string> ht;
std::mutex mx;

void func1() { 
  std::lock_guard<std::mutex> lk(mx);
  ht[1] = "func1 done";
};

struct func2 {
  void operator()() { 
    std::lock_guard<std::mutex> lk(mx);
    ht[2] = "func2 done"; 
  };
};

void func3( int ) { 
  std::lock_guard<std::mutex> lk(mx);
  ht[3] = "func3 done"; 
};

void func4(int i) {
  try {     
    std::lock_guard<std::mutex> lk(mx);
    ht[4] = "func4 done"; 
  }
  catch (std::exception &e) { }
}

int main () {

  std::vector<std::thread> thread_vector;

  thread_vector.emplace_back( func1 ); 
  thread_vector.emplace_back( func2() );
  thread_vector.emplace_back( std::bind( func3, -21 ) );
  thread_vector.emplace_back( std::bind( func4, 99 ) );

  for(std::thread &t : thread_vector)
  {
      t.join();
  }
  
  for ( auto it = ht.begin(); it != ht.end(); ++it )
    std::cout << " " << it->first << ":" << it->second << "\n";
  
  std::cout << std::endl;
  return 0;  
};
 3:func3 done
 2:func2 done
 4:func4 done
 1:func1 done
I don't think you need a thread pool.

To control the lifetime of a thread condition_variable is usually the way to go:

http://www.cplusplus.com/reference/condition_variable/condition_variable/

an array would not be suitable for the application in mind.
That sounds overly complicated...
Last edited on
coder777 wrote:
Cubbi wrote:
FWIW this edit to the original example works for me:
Well, destructing the service object will stop the processing. So isn't it actually the same as calling ios.stop(); directly?

The way I read the original post, the goal was to wait for all posted jobs to complete, and then shut down the thread pool. If it's okay to abort the posted jobs, then yes, ios.stop or the destructor is all that's needed.


bluefrog wrote:
boost::shared_lock<boost::shared_mutex> lk(mx);
ht[4] = "func4 done";

shared lock is for reading, you're writing, you need a unique_lock or simply lock_guard as in coder777's exmaple.

thanks for the code. The reason I did not use a vector of threads is because the boost thread group offers simpler management.
In addition, if I were to use a vector of threads, then I would need to create and manage a thread safe queue, to distribute jobs amongst threads.

Using the boost thread group along with io_service offers the above in a much simpler way.

You mentioned " the goal was to wait for all posted jobs to complete". That is indeed the goal, hence I was appreciative of coder777's suggestion of using work_ios.reset();, which I've included in the stop_service method (as per the previous code post), but forgot to include in the destructor, which I now have.

Regarding the shared_lock, the boost documentation states:
"shared ownership as well as exclusive ownership. This is the standard multiple-reader / single-write model: at most one thread can have exclusive ownership, and if any thread does have exclusive ownership, no other threads can have shared or exclusive ownership. Alternatively, many threads may have shared ownership."

So can one not write exclusively, but read in a shared way using a shared_lock providing one uses a shared_mutex ?
I am unsure, but it seems safe to do it this way.
Last edited on
Pages: 12