ROOT_Application  2.0
C++ Core modules and GUIStock
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TCPServer.cpp
Go to the documentation of this file.
1 
8 #include "TCPServer.h"
9 #include "StockManager.h"
10 
11 #include <iostream>
12 #include <istream>
13 
14 #include <boost/bind.hpp>
15 #include <boost/asio/streambuf.hpp>
16 
18 
19 inline
20 tcp_connection::pointer tcp_connection::create(boost::asio::io_service& io_service) {
21 #ifdef DEBUG_CPP
22  std::cout << "tcp_connection::create return a new tcp_connection" << std::endl;
23 #endif
24 
25  return pointer(new tcp_connection(io_service));
26 }
27 
28 // to modify for my purpose, in RealTime case, nothing to send back ! or just ok no error !
30 
31 #ifdef DEBUG_CPP
32  std::cout << " tcp_connection::start() will associate handle_write" << std::endl;
33 #endif
34 
35  // make async. call and send callback to handle_write
36  // not sure how working but response not used at this point
37  boost::asio::async_write(socket_, boost::asio::buffer(message_),
38  boost::bind(&tcp_connection::handle_write, shared_from_this(),
39  boost::asio::placeholders::error,
40  boost::asio::placeholders::bytes_transferred));
41 }
42 
43 // to send an answer
44 void tcp_connection::handle_write(const boost::system::error_code& error,
45  size_t /*bytes_transferred*/)
46 {
47 #ifdef DEBUG_CPP
48  std::cout << "Entry connection::handle_write, do nothing " << std::endl;
49  //std::cout << "check error " << error << " size message back " << bytes_transferred << std::endl;
50  // not working, message already sent... what the point, receive the answer ??
51  //message_= "modified toto";
52 #endif
53 
54  if ( error ) {
55  std::cout << "tcp_connection::handle_write get error " << error << std::endl;
56  }
57 }
58 
59 
61 
62 // Constructor with StockManager and call with io_service, deprecated
63 TCPServer::TCPServer ( StockManager *sm, unsigned int iport, boost::asio::io_service& io_service ) :
64  f_sm( sm ), port( iport ), acceptor_( io_service, tcp::endpoint(tcp::v4(), iport)) {
65 #ifdef DEBUG_CPP
66  std::cout << "Constructor TCPServer port with stockmanager " << port << " " << f_sm << std::endl;
67 #endif
68  // default at creation
69  is_running = false;
70  want_exit = false;
71  //t_ = std::thread{ &TCPServer::LoopThread, this };
72 }
73 
74 // io_service created inside the function
75 TCPServer::TCPServer ( StockManager *sm, unsigned int iport ) :
76  f_sm( sm ), port( iport ), my_io_service(), acceptor_( my_io_service, tcp::endpoint(tcp::v4(), iport)) {
77 #ifdef DEBUG_CPP
78  std::cout << "Constructor TCPServer port with stockmanager " << port << " " << f_sm << std::endl;
79 #endif
80  // do not compile
81  //acceptor_ = tcp::acceptor(my_io_service, tcp::endpoint(tcp::v4(), port));
82 
83  // default at creation
84  is_running = false;
85  want_exit = false;
86  //t_ = std::thread{ &TCPServer::LoopThread, this };
87 }
88 
90 {
91 #ifdef DEBUG_CPP
92  std::cout << "Destructor TCPServer " << std::endl;
93 #endif
94  // not necessary in unit_test run_stop_miss_stop, but if active ??
95  if ( is_running )
96  StopServer();
97 
98  // important bracket, lock is deleted this way
99  /*
100  {
101  std::unique_lock<std::mutex> lock( loop_mutex );
102  want_exit = true;
103  loop_cv.notify_one();
104  }
105  // wait termination of the thread
106  t_.join();
107  */
108 }
109 
110 
112 {
113 #ifdef DEBUG_CPP
114  std::cout << "Entry TCPServer::StartServer() TCPServer port " << port << std::endl;
115 #endif
116 
117  //is_running = true;
118  t_ = std::thread{ &TCPServer::LoopThread, this };
119 
120 
121  //acceptor_.open( my_io_service, tcp::endpoint(tcp::v4(), port));
122  //acceptor_.open(); //tcp::endpoint(tcp::v4(), port));
123  /* http://stackoverflow.com/questions/15829165/how-to-launch-an-event-when-my-boostasio-tcp-server-just-start-running-aka/15883510#15883510
124  * http://stackoverflow.com/questions/15926077/boostasio-acceptor-reopen-and-async-read-after-eof
125  using boost::asio::ip::tcp;
126  tcp::endpoint endpoint_(tcp::v4(), 123);
127  tcp::acceptor acceptor_(io_service); // closed state
128  acceptor.open(endpoint_.protocol()); // opened state
129  acceptor.bind(endpoint);
130  acceptor.listen();
131  */
132  /*
133  std::cout << "test is open " << acceptor_.is_open() << std::endl;
134  if ( acceptor_.is_open() == false ) {
135  tcp::endpoint endpoint_(tcp::v4(), port);
136  acceptor_.open( endpoint_.protocol() );
137  //acceptor_.bind(endpoint_);
138  //acceptor_.listen();
139  }
140  // recreate connection
141  start_accept();
142  */
143 
144  // here important mutex ?? bracket ??
145  {
146  std::unique_lock<std::mutex> lock( loop_mutex );
147  is_running = true;
148 
149  // new added delete _connection, here need to recreate
150  // signal condition variable has changed. equivalent notify_all() here
151  loop_cv.notify_one();
152  }
153 
154 }
155 
156 // allow to not have the TCP recreating when deleting
158 #ifdef DEBUG_CPP
159  std::cout << "TCPServer::StopServer is_running " << is_running << std::endl;
160 #endif
161 
162  acceptor_.get_io_service().stop();
163  // important bracket, lock is deleted this way
164  {
165  std::unique_lock<std::mutex> lock( loop_mutex );
166  want_exit = true;
167  loop_cv.notify_one();
168  }
169  // wait termination of the thread
170  //t_.join();
171  is_running = false;
172  t_.join();
173 
174  // not sure if running or not
175  /*
176  if ( is_running ) {
177 
178  //{
179  std::unique_lock<std::mutex> lock( loop_mutex );
180  // update status
181  is_running = false;
182  loop_cv.notify_one();
183  //}
184  */
185  // not promising...
186  // not enought, tcp_connection still alive and recreated..
187  // stopping io_service is not enough, maybe acceptor.listen / stop_listen
188  // try to deal with connection
189  //delete _connection.get();
190  /*
191  std::cout << "before reset() count() " << _connection.use_count() << std::endl;
192  _connection.reset();
193  std::cout << "after reset() count() " << _connection.use_count() << std::endl;
194  */
195 
196  // necessary ?? yes otherwise service always running, cannot exit
197 
198  // thread will make an other loop
199  // reset() must be called before run()
200  // in fact it is received, but executed after the restart !!
201  // need stop to quit from run()
202 
203 
204  // This function is used to close the acceptor. Any asynchronous accept operations will be cancelled immediately.
205  // A subsequent call to open() is required before the acceptor can again be used to again perform socket accept operations.
206  //acceptor_.cancel();
207  //acceptor_.close();
208  //}
209 
210  // reset() must be called before run()
211  //acceptor_.get_io_service().stop();
212 }
213 
214 // thread created in constructor, enter infinite loop to keep the thread alive until deletion of the object
216 #ifdef DEBUG_CPP
217  std::cout << "TCPServer::LoopThread " << std::endl;
218 #endif
219 
220  // here create one initial tcp_connection, bound to handle_accept
221  // commented if connection deal in Start/Stop server
222  start_accept();
223 
224  while ( true ) {
225 
226  { //not sure important here, important to release the lock, stopserver blocked otherwise
227  // must aquire the lock first
228  std::unique_lock<std::mutex> lock( loop_mutex );
229  // usual call, no predicate
230  // wait(unique_lock<mutex>& lk);
231 
232  std::cout << "TCPServer::LoopThread will wait a notify " << std::endl;
233  // while predicate is false, wait
234  // wait relax the lock, wait for a signal, relock and executes
235  loop_cv.wait( lock, [&](){ return is_running || want_exit ; } );
236  //loop_cv.wait( lock );
237 
238  if ( want_exit ) {
239  std::cout << "LoopThread will exit and terminate" << std::endl;
240  return;
241  }
242 
243  }
244 
245  std::cout << "TCPServer::LoopThread() before acceptor_.get_io_service().run() " << std::endl;
246 
247  // blocking function, reset() solve the problem, must be called before a new run
248  //acceptor_.get_io_service().reset();
249  acceptor_.get_io_service().run();
250 
251  // will pass when get_io_service.stop() will be called
252  //std::cout << "TCPServer::LoopThread() after acceptor_.get_io_service().run() " << std::endl;
253  }
254 
255 
256 }
257 
258 // create the connection, wait to receive a message async.
260 
261 #ifdef DEBUG_CPP
262  std::cout << "TCPServer::start_accept create connection & async_accept" << std::endl;
263 #endif
264 
265  // create a new connection
266  tcp_connection::pointer new_connection = tcp_connection::create(acceptor_.get_io_service());
267  //_connection =
268  // tcp_connection::create(acceptor_.get_io_service());
269 
270 
271  // Verify socket is in a closed state. Private
272  //new_connection->socket_.close();
273  // no change, next execution call new_connection is close
274  //new_connection->GetSocket().close();
275 
276  // use a non-blocking call and bind the received message to handle_accept
277  acceptor_.async_accept( new_connection->socket(),
278  boost::bind(&TCPServer::handle_accept, this, new_connection,
279  boost::asio::placeholders::error));
280 }
281 
282 // new_connection is used to read the message (and answer)
283 // but it is automatically deleted at the end of the function. An other connection is created at the very end.
284 // f_sm->SendDataCSV() is run synchro. in this function (but this function is run async.)
286  const boost::system::error_code& error)
287 {
288 #ifdef DEBUG_CPP
289  std::cout << "TCPServer::handle_accept, new_connection->socket().is_open() " << new_connection->socket().is_open() << std::endl;
290  std::cout << "is_running " << is_running << std::endl;
291 #endif
292 
293  boost::asio::streambuf message;
294  std::string tmp_string;
295 
296  // what kind of error ??
297  if ( error )
298  std::cout << " TCPServer::handle_accept() error: " << error << std::endl;
299 
300  // fix delimiter " END" for the end of the message
301  try {
302  boost::asio::read_until( new_connection->socket(), message, " END");
303 
304 //#ifdef DEBUG_CPP
305  // do not compile
306 // std::cout << "after read message: " << message.data() << std::endl;
307 //#endif
308 
309  // transform message to string, tricky, but from website
310  std::istream is(&message);
311  std::getline(is, tmp_string);
312  // delete the end delimiter: " END"
313  tmp_string.erase(tmp_string.size()-4);
314 //#ifdef DEBUG_CPP
315 // std::cout << "TCPServer::handle_accept message_string!" << tmp_string << "!" << std::endl;
316 //#endif
317 
318  // call to StockManager function to extract the data and update the Stock
319  // throw StockException if bad format
320  std::cout << "before SendDataCSV" << std::endl;
321  f_sm->SendDatadCSV( tmp_string );
322 
323  } catch (boost::system::system_error &e) {
324  std::cout << "Got exception boost::system::system_error from boost:asio::read_until() " << e.what() << std::endl;
325 
326  // StockExcpetion if bad format, avoids crash, and start_accept is called again
327  } catch (StockException &e ) {
328  std::cout << "\nTCPServer, got a StockException " << e.what() << std::endl;
329  // set error_code in the boost way
330  // or throw C++11 exception:
331  // Would need to copy into the main thread. From here:
332  // catch(..)
333  // err = current_exception()
334  // a SM::SetThreadError(exception_ptr &err) (in a list or queue), why not in RealTime ? use observer pattern 2 types of messages ?
335  // then main thread checks (Timer again) if some errors occurred
336  // more general :
337 
338  } catch (std::exception &e ) {
339  std::cout << "Got base std::exception " << e.what() << std::endl;
340  } catch (...) {
341  std::cout << "Got unknow ... exception " << std::endl;
342  }
343 
344  // assign a non-blocking function to answer to the message, not used now
345 /*
346  if (!error) {
347 #ifdef DEBUG_CPP
348  std::cout << "TCPServer::handle_accept() run new_connection->start" << std::endl;
349 #endif
350  // seems only to associate handle_write, which is not used.
351  // if commented, working as well, certainly cannot send back data
352  //new_connection->start();
353  } else {
354  std::cout << "Error in TCPServer::handle_accept " << error << std::endl;
355  }
356 */
357 
358  // restart a new connection for receiving data
359 //#ifdef DEBUG_CPP
360 // std::cout << "TCPServer::handle_accept() run start_accept()" << std::endl;
361 //#endif
362  start_accept();
363 
364 #ifdef DEBUG_CPP
365  std::cout << "End TCPServer::handle_accept()" << std::endl;
366 #endif
367 }
Singleton class, stores all loaded stocks.
Definition: StockManager.h:42
std::condition_variable loop_cv
Definition: TCPServer.h:220
tcp::acceptor acceptor_
...
Definition: TCPServer.h:205
virtual const char * what() const noexcept
void start_accept()
create a new tcp_connection to receive message
Definition: TCPServer.cpp:259
tcp_connection(boost::asio::io_service &io_service)
Private constructor.
Definition: TCPServer.h:88
std::mutex loop_mutex
Definition: TCPServer.h:219
void StopServer()
Stop the server.
Definition: TCPServer.cpp:157
Base Exception for Stock and StockManager, all src_cpp code.
std::string message_
Message received by the socket.
Definition: TCPServer.h:103
void StartServer()
Start the server.
Definition: TCPServer.cpp:111
TCPServer(StockManager *sm, unsigned int iport)
Constructor, io_service is a private data member (my_io_service).
Definition: TCPServer.cpp:75
bool is_running
Indicate if the server is running.
Definition: TCPServer.h:209
static pointer create(boost::asio::io_service &io_service)
Static function to create a new socket.
Definition: TCPServer.cpp:20
void LoopThread()
Function run by the internal thread.
Definition: TCPServer.cpp:215
void handle_accept(tcp_connection::pointer new_connection, const boost::system::error_code &error)
handle the received message: parse and send string data to StockManager.
Definition: TCPServer.cpp:285
void SendDatadCSV(const std::string &message)
Extract info from the message coming from TCPServer, update the Lists and notify observers (GUIMainCo...
virtual ~TCPServer()
Destructor.
Definition: TCPServer.cpp:89
Stores all stocks and defines convenient functions.
unsigned int port
Port where the server listens.
Definition: TCPServer.h:198
std::thread t_
Internal thread.
Definition: TCPServer.h:214
void handle_write(const boost::system::error_code &error, size_t bytes_transferred)
Function called async.
Definition: TCPServer.cpp:44
StockManager * f_sm
Pointer to the StockManager singleton, will receive and treat the messages.
Definition: TCPServer.h:196
boost::shared_ptr< tcp_connection > pointer
Definition: TCPServer.h:62
void start()
Function to be executed when a message must be send back.
Definition: TCPServer.cpp:29
bool want_exit
Indicate the thread must exit, set in destructor only.
Definition: TCPServer.h:211
tcp::socket socket_
Store an io_service.
Definition: TCPServer.h:99
Server to receive new Instantaneous data from ServerPortfolio.