27 import sys, logging, time
29 import Queue, threading
56 threading.Thread.__init__(self,name=name)
58 self.
logger = logging.getLogger(
'SP.AutoParser.ThreadParser')
59 self.logger.debug(
"init thread parser")
70 self._update_stock_th.set_write_to_file(
True)
81 print "destructor of thread parser ", self.name
90 self.logger.debug(
"thread _parser %s run() " % self.name)
99 tuple_item = self._queue_jobs.get()
101 self.logger.debug(
"get from queue namestock %s, action %s" % tuple_item)
102 namestock, action = tuple_item
107 self.logger.debug(
"Catch exception in thread_parser")
123 if action ==
"InstValue" :
130 self._update_stock_th.update_data( namestock )
132 print "AutoParser Got exception, just break "
133 self.logger.warning(
"AutoParser got exception in GetInstValues %s" % namestock )
228 print "a different action to implement ", action
229 assert ( 1 == 2 ),
"a different action to implement"
239 threading.Thread.__init__(self, name=
'queue_thread')
241 self.
logger = logging.getLogger(
"SP.AutoParser.Queue")
242 self.logger.info(
'init thread queue')
260 update_stock.set_write_to_file(
True)
265 self.logger.info(
"list_stocks: %s" % update_stock.get_stock_keys())
270 self.list_inactive_stock=[]
272 self.list_active_stock=[]
282 for symbol
in update_stock.get_stock_keys():
287 if update_stock.check_open( symbol ):
289 self.list_active_stock.append(symbol)
292 self.list_inactive_stock.append(symbol)
301 all_lines = update_stock.print_dict_stocks(
'InstValue')
302 self.logger.debug(
"\n%s",all_lines)
308 self._list_thread_parser=[]
313 print "init handler_queue"
314 print "list_active_stock"
315 print self.list_active_stock
316 print "list_inactive_stock"
317 print self.list_inactive_stock
320 self.logger.info(
"list active %s" % ([stock
for stock
in self.list_active_stock]) )
321 self.logger.info(
"list inactive %s" % ([stock
for stock
in self.list_inactive_stock]) )
325 print "\ndestructor handler_queue\n"
347 for i
in range(nb_threads_parser):
348 name_thread =
"t_parser-" + str(i)
350 self._list_thread_parser.append(t)
361 print "thread_handle_queue run() will update every ", TIMEPARSER,
" sec."
367 for symbol
in self.list_inactive_stock:
377 time_to_open = obj_stock.time_to_open()
378 self.logger.debug(
"time_to_open %s %d" % (symbol,time_to_open.seconds))
382 if (time_to_open.days==0)
and (time_to_open.seconds < 3600):
385 self.list_inactive_stock.remove(symbol)
386 self.list_active_stock.append(symbol)
389 obj_stock.state =
'WAIT_OPEN'
390 self.logger.info(
"CLOSED->WAIT_OPEN, %s will open in %d seconds" % (symbol,time_to_open.seconds))
395 for symbol
in self.list_active_stock:
401 if obj_stock.state ==
'OPEN':
408 if obj_stock.get_market() ==
'ml_euro':
411 if obj_stock.test_ML_done():
413 self.list_active_stock.remove(symbol)
414 self.list_inactive_stock.append(symbol)
416 self.logger.info(
"ML market %s done, move to inactive", stock)
420 self.logger.debug(
"put stock, action %s %s" % (symbol,
"InstValue"))
421 self._queue_jobs.put ( (symbol,
"InstValue") )
423 elif obj_stock.state ==
'WAIT_OPEN':
424 self.logger.debug(
"WAIT_OPEN insert to the queue")
425 self._queue_jobs.put ( (symbol,
"InstValue") )
430 elif obj_stock.state ==
'CLOSED':
431 self.list_active_stock.remove(symbol)
432 self.list_inactive_stock.append(symbol)
433 self.logger.info(
"%s OPEN->CLOSED, moved to inactive_stock", symbol)
445 time.sleep ( TIMEPARSER )
460 self.logger.info(
"%s Force InsertToQueue", stock)
466 if obj_stock.is_valid():
473 self.list_active_stock.append( stock )
475 self.list_inactive_stock.append( stock )
477 print "Insert To Queue new list active ", self.list_active_stock
478 print "Insert To Queue new list inactive ", self.list_inactive_stock
481 self.logger.info(
"InvalidStock discarded")
492 if __name__ ==
"__main__":
497 logging.basicConfig(level=logging.DEBUG)
499 logger = logging.getLogger(
'SP')
500 main_logger = logging.getLogger(
"SP.main")
510 print "FIRST: threading.enumerate ",threading.enumerate()
514 print "tqh.name ", tqh.name,
" is Alive:", tqh.is_alive(),
" is daemon:", tqh.daemon
518 print "tqh.name ", tqh.name,
" is Alive:", tqh.is_alive(),
" is daemon:", tqh.daemon
524 print "SECOND: enumerate ",threading.enumerate()
539 print "LAST : enumerate ",threading.enumerate()
_update_stock_th
internal reference to the singleton DictionaryStocks should be an access to UpdateStocks now...
Define class UpdateStocks and ParserFactory.
def InsertToQueue
Function to force an update, insert directly the stock in the queue.
def __del__
Contains the threads who will listen to the queue.
def run
Consume a stock and an action to perform form the queue Action implemented : InstValue To extend to ...
def __init__
Constructor for one thread_parser.
_queue_jobs
queue to read the jobs to do, what to put ? a Stock or the symbol ?
Independent thread parsing the web pages, consuming data from the queue.
_update
use to stop the threads __prefix with name class, safer or stupid ! needed ?? no just wait something ...
Container of all Stocks objects, it also reads the static stocks configuration file "dictstocks...
Global variables for configuration: paths, TCP ports and generic definitions.
def create_threads_parser
Create and start the threads, called at initialisation.
Update data of a list of Stock's (only one action by call is possible).
def run
Fill the queue every TIMEPARSER sec.
Thread to handle the queueing system and create the thread_parsers.
Define singleton class DictionaryStocks, act as the main container of Stocks objects.