Binding methods for asynchronous threads

I'm creating a logger which, instead of writing to file, stdout, or stderr, writes to a socket so that it can be logged by a separate process (or on a separate machine).

The trick is that this logger runs in a time-sensitive real-time scheduler. Therefore I cannot wait for the OS to block my process during sending operations. Boost's asio library looks like a good option.

My problem is using the bind function on line 23. The intention is:
io_service.post() will execute the argument function in the thread which "run" has been invoked. I want to run: m_socket.send(message) in that thread. To do that, I am using boost bind with the arguments shown in line 23, but I think I have a syntax issue. Any ideas would be appreciated.

The sending class looks like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>

class StreamSender
{
    StreamSender() :
      m_io_service(),
      m_socket(m_io_service),
      m_work(m_io_service),
      m_ip("localhost"),
      m_port("32323")
    {
        m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, 
                                               &m_io_service));
        Start();
    }
    
public: 
    void Send(const std::string& message)
    {
        m_io_service.post(boost::bind(&boost::asio::ip::tcp::socket::send, 
                                      &m_socket, 
                                      boost::asio::buffer(message)));
    }
    
    void Start() // Can be called to reconnect.
    {
        boost::asio::ip::tcp::resolver resolver(m_io_service);
        boost::asio::ip::tcp::resolver::iterator endpoint 
            = resolver.resolve(boost::asio::ip::tcp::resolver::query(m_ip, m_port));
        boost::asio::connect(this->m_socket, endpoint);
    }
    
    static StreamSender* GetInstance() // Singleton pattern
    {
        if (!m_instance)
            m_instance = new StreamSender();
        return m_instance;
    }
    
    ~StreamSender()
    {
        m_io_service.stop();     // IO_Service no longer accepts new jobs
        m_threadpool.join_all(); // Waits for current jobs to finish executing
    }
private:
    static StreamSender* m_instance;
  
    boost::asio::io_service       m_io_service;
    boost::asio::ip::tcp::socket  m_socket;
    boost::thread_group           m_threadpool;
    boost::asio::io_service::work m_work;
    
    std::string                   m_ip;
    std::string                   m_port;
};

StreamSender* StreamSender::m_instance = 0;
Last edited on
I've also been trying:
1
2
3
4
boost::bind(&boost::asio::ip::tcp::socket::send, 
            &m_socket, 
            _1
           )(boost::asio::buffer(message))


The errors I get are:
$ g++ -o client main.cpp -L/usr/lib -lboost_thread -lboost_system
In file included from logger.h:3:0,
                 from main.cpp:1:
streamsender.h: In member function ‘void StreamSender::Send(const string&)’:
streamsender.h:25:14: error: no matching function for call to ‘bind(<unresolved overloaded function type>, boost::asio::ip::tcp::socket*, const boost::arg<1>&)’
            _1)(boost::asio::buffer(message)));
              ^


This is funny because straight from Boost's website they give this as an example of how to use boost::bind:
1
2
3
4
5
struct X { 
  bool f(int a); 
} x;

bind(&X::f, &x, _1)(i);   // (&x)->f(i) 
I've also tried to call a local member function instead of one owned by boost's socket:
1
2
3
4
5
6
7
8
9
10
    void transmit(const std::string& message)
    {
        m_socket.send(boost::asio::buffer(message));
    }
    
public: 
    void Send(const std::string& message)
    {
        m_io_service.post(boost::bind(&StreamSender::transmit, this, _1)(message));
    }


But with this configuration I get the following:
$ g++ -o client main.cpp -L/usr/lib -lboost_thread -lboost_system
In file included from logger.h:3:0,
                 from main.cpp:1:
streamsender.h: In member function ‘void StreamSender::Send(const string&)’:
streamsender.h:28:82: error: invalid use of void expression
         m_io_service.post(boost::bind(&StreamSender::transmit, this, _1)(message));
                                                                                  ^
Got it!:
1
2
3
4
5
6
7
8
9
10
    void transmit(const std::string& message)
    {
        m_socket.send(boost::asio::buffer(message));
    }
    
public: 
    void Send(const std::string& message)
    {
        m_io_service.post(boost::bind(&StreamSender::transmit, this, message));
    }



Full code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>

class StreamSender
{
    StreamSender() :
      m_io_service(),
      m_socket(m_io_service),
      m_work(m_io_service),
      m_ip("localhost"),
      m_port("32323")
    {
        m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, 
                                               &m_io_service));
        Start();
    }
	
	void transmit(const std::string& message)
	{
		m_socket.send(boost::asio::buffer(message));
	}
    
public: 
    void Send(const std::string& message)
    {
        m_io_service.post(boost::bind(&StreamSender::transmit, this, message));
    }
    
    void Start() // Can be called to reconnect.
    {
        boost::asio::ip::tcp::resolver resolver(m_io_service);
        boost::asio::ip::tcp::resolver::iterator endpoint 
            = resolver.resolve(boost::asio::ip::tcp::resolver::query(m_ip, m_port));
        boost::asio::connect(this->m_socket, endpoint);
    }
    
    static StreamSender* GetInstance() // Singleton pattern
    {
        if (!m_instance)
            m_instance = new StreamSender();
        return m_instance;
    }
    
    ~StreamSender()
    {
        m_io_service.stop();     // IO_Service no longer accepts new jobs
        m_threadpool.join_all(); // Waits for current jobs to finish executing
    }
private:
    static StreamSender* m_instance;
  
    boost::asio::io_service       m_io_service;
    boost::asio::ip::tcp::socket  m_socket;
    boost::thread_group           m_threadpool;
    boost::asio::io_service::work m_work;
    
    std::string                   m_ip;
    std::string                   m_port;
};

StreamSender* StreamSender::m_instance = 0;
Topic archived. No new replies allowed.