ServerPortfolio  2.0
Python parsers and server
 All Classes Namespaces Files Functions Variables Properties Pages
AutoParser.py
Go to the documentation of this file.
1 ## @package serverportfolio.AutoParser
2 # Module for updating the dictionary of stocks using independent threads every TIMEPARSER sec,\n
3 # and parsing data on demand from the server.
4 #
5 # Module loops over all stocks and record value in a 'CSV' style (in /data/historical/*INST.dcsv).
6 # To extend to different action, may use threads for other parsing actions (FORCE...)
7 #
8 # Implement a queueing system of stocks which are regularly 'parsed' by independent threads:
9 # - the threading loop every TIMEPARSER second.
10 # - put/remove stocks in the queue depending on states: is CLOSED, WAIT_OPEN, OPEN
11 #
12 # The parser threads waits for data in the queue
13 # implements a simple lock if Server wait for an answer
14 #
15 # The queue is run in a separate thread ( no daemon ),
16 # which creates 2 threads ( "daemon-ised" ) for parsing
17 # this way, main thread is still free
18 #
19 # Add list_error_stock, easy to deal, may check when Dictionary is reloaded for instance
20 # may try to define temporary problem ( internet connection / slow ) and error in key with error value...
21 #
22 # Rules for the states
23 # States for stock CLOSED(inactive)->WAIT_OPEN(active,but not write)->OPEN(active and write)->CLOSED(inactive)
24 #
25 # LastChanged $Id: AutoParser.py 9 2015-04-02 23:27:25Z michael $
26 
27 import sys, logging, time
28 # for threading and queue
29 import Queue, threading
30 
31 import serverportfolio.GlobalDicts as GlobalDicts
32 from serverportfolio.DictionaryStocks import DictionaryStocks
33 from serverportfolio.UpdateStocks import UpdateStocks
34 # need module for parsing, will import Bourso or Yahoo parser. Included
35 #from serverportfolio.Parsers.ParserStocks import ParserStocks
36 
37 ## @brief Wait X seconds between parsing again
38 TIMEPARSER=30
39 ## @brief Number of threads waiting for data in the queue. It is fixed at the initialisation.
40 NB_THREADS_PARSER=3
41 
42 ## @class thread_parser
43 # @brief Independent thread parsing the web pages, consuming data from the queue.
44 class thread_parser(threading.Thread):
45 
46  # may implement a counter of instances, check when there is crash...
47  # count=0
48 
49  ## @brief Constructor for one thread_parser.
50  # @param queue with Stock waiting to be parsed
51  # @param dict_stock ...
52  # @param name set the name of the thread
53  def __init__( self,queue, name ):
54 
55  # parent class initialization
56  threading.Thread.__init__(self,name=name)
57 
58  self.logger = logging.getLogger('SP.AutoParser.ThreadParser')
59  self.logger.debug("init thread parser")
60 
61  ## queue to read the jobs to do, what to put ? a Stock or the symbol ?
62  self._queue_jobs = queue
63  ## internal reference to the singleton DictionaryStocks
64  # should be an access to UpdateStocks now, one class for each parser certainly better
65  #self._dict_stocks = dict_stocks
66 
67  # nice to have an unique object with an unique parser
68  self._update_stock_th = UpdateStocks( 'InstValue' ) # fixed_parser='InstValue' )
69  # set writing mode. done by UpdateStocks as post-process
70  self._update_stock_th.set_write_to_file(True)
71  # could make a link to the dictionary in UpdateStocks, used for get_stocks,
72 
73  ## use to stop the threads
74  # __prefix with name class, safer or stupid !
75  # needed ?? no just wait something in the queue, to check ?
76  # @todo check if necessary
77  self._update = True
78 
79  # test
80  def __del__(self):
81  print "destructor of thread parser ", self.name
82 
83  ## @brief Consume a stock and an action to perform form the queue
84  # Action implemented : InstValue\n
85  # To extend to more\n
86  def run(self):
87 
88  # here add condition on the queue, just execute the command
89  # tests should be done previously
90  self.logger.debug("thread _parser %s run() " % self.name)
91 
92  # update allows to finish the thread from the outside
93  count=0
94  while self._update:
95 
96  # try to get a job to do
97  try:
98  #print "before reading queue"
99  tuple_item = self._queue_jobs.get()
100  #print "get from queue namestock ",namestock, action
101  self.logger.debug("get from queue namestock %s, action %s" % tuple_item)
102  namestock, action = tuple_item
103  #print "get from queue namestock, action ",namestock, action
104 
105  # except never seen executed
106  except:
107  self.logger.debug("Catch exception in thread_parser")
108  raise
109 
110  # happens if WAIT_OPEN only,
111  # should not if error check before, never in list_active !!
112 # if self.dict_stocks.GetError( namestock ) != 0:
113 # print "AutoParser found error BEFORE GetInstValues ", namestock
114 # # let info until problem solved
115 # self.logger.info("AutoParser found error %s BEFORE GetInstValues" % namestock)
116 # # make difference ??
117 # break
118  #continue
119 
120  ######################################################################
121  # Main point, getting instant. values of a stock
122 # now parser created at start, e_action fixed (or variable?), should re-use the parser, still need to wait it finishes
123  if action == "InstValue" :
124 
125 
126  # get last values from the web
127  # exception problem with connection or parsing, should not kill the thread !
128  # new all update is done there (except ML)
129  try :
130  self._update_stock_th.update_data( namestock )
131  except :
132  print "AutoParser Got exception, just break "
133  self.logger.warning("AutoParser got exception in GetInstValues %s" % namestock )
134  break
135 
136  # here ok, active, need a last_modification/last_saved
137  # avoid some call to wrapper, read file. Could be done in Stock as well !
138  #
139  # post-process nothing to do, except SavingToFile
140  # obj_stock = DictStocks().get_stocks( namestock )
141  # obj_stock.save_inst_value()
142 
143  # check about error given by GetInstValues, problem with connection or parsing, should not kill the thread
144 # if self.dict_stocks.GetError( namestock ) != 0:
145 # # let info until problem solved
146 # self.logger.warning( "AutoParser found error after GetInstValues %s" % namestock )
147 # print "AutoParser found error after GetInstValues ", namestock
148 # break
149 
150 # post processing, could be done by Stock itself, inside GetInstValue,
151 # or inside a Parser function ( parser belong to the thread, can be re-used ), now call run_parser( namestock ),
152 # or post-processing called by thread, updated_stock ( e_action ) : Dictionary calls write file, send TCP message...
153 # release lock...
154  # test should not happen
155  # if dict_data == None:
156  # self.logger.critical("dict_data is null ",namestock)
157 
158 
159  # before test if new state is closed,
160  # check state if 'CLOSED', no need to deal with file opening and closing file for nothing
161 # if (dict_data['state']=='CLOSED'):
162 #
163 # # means stock was opened (or WAIT_OPEN) and has closed, change state in dict_stocks
164 # if self.dict_stocks.GetState( namestock ) == 'OPEN':
165 # self.dict_stocks.SetState( namestock, 'CLOSED' )
166 #
167 # # copy a last time the value to be sure to have final data of the day
168 #
169 # # could check if different..., function test_change ??
170 # #DictStocks[namestock]['data']=dict_data
171 # self.dict_stocks.AddVolatileData( namestock, dict_data )
172 # self.logger.info("%s OPEN->CLOSED", namestock)
173 #
174 # # means stock wait for opening, let in the state
175 # #else if DictStocks[namestock]['state'] == 'WAIT_OPEN' :
176 # #print "stock %s remains WAIT_OPEN" % (namestock)
177 #
178 # # means stock is 'OPEN'
179 # else:
180 #
181 # # test, should never happen !! assert
182 # #if dict_data['state'] != 'OPEN':
183 # if obj_stock.state != 'OPEN':
184 # #print "ERROR WITH STATE IN THREAD_PARSER"
185 # self.logger.critical("%s ERROR WITH STATE IN THREAD_PARSER", namestock)
186 # assert ( 1 == 2 ), "ERROR WITH STATE IN THREAD_PARSER"
187 # sys.exit(0)
188 #
189 # # update WAIT_OPEN->OPEN, no change for the queue already in active_list
190 # #if self.dict_stocks.GetState( namestock ) == 'WAIT_OPEN':
191 # # self.dict_stocks.SetState( namestock, 'OPEN' )
192 # if obj_stock.state == 'WAIT_OPEN':
193 #
194 # # write anyway in this case because of the ML (otherwise need to compare date and time)
195 # # done by the parser now
196 # # self.dict_stocks.SaveInstValue( namestock, dict_data )
197 #
198 # # should be in Stock all this,
199 # # assure the next test will true, not recorded twice
200 # # last modif not necessary because 'time'
201 # self.dict_stocks.SetLastModification( namestock, dict_data['time'] )
202 # self.dict_stocks.AddVolatileData( namestock, dict_data )
203 #
204 # # set to log
205 # self.logger.info("WAIT_OPEN->OPEN %s", namestock )
206 #
207 # # to put in Stock
208 # # test for saving into file
209 # # test if different time, if WAIT_OPEN->OPEN, will be true ( last is different or null )
210 # if dict_data['time'] != self.dict_stocks.GetLastModification( namestock ):
211 # #print "save record stock %s" % namestock
212 # #print "last_modif :",DictStocks[namestock]['last_modification']
213 # #print "time :",dict_data['time']
214 # self.dict_stocks.SetLastModification ( namestock, dict_data['time'] )
215 # self.dict_stocks.AddVolatileData( namestock, dict_data )
216 #
217 # # write into file if modified
218 # #self.dict_stocks.SaveInstValue( namestock, dict_data )
219 # self.dict_stocks.SaveInstValue2( namestock, dict_data )
220 # # uncomment for tests
221 # #else :
222 # # print "%s time has not evolved or first 'OPEN' after 'WAIT_OPEN', do not record " % (namestock)
223 
224 
225 
226  ################################################################
227  else :
228  print "a different action to implement ", action
229  assert ( 1 == 2 ), "a different action to implement"
230 
231 ## @class Thread_Handle_Queue
232 # @brief Thread to handle the queueing system and create the thread_parsers.
233 class Thread_Handle_Queue(threading.Thread):
234 
235  ## @brief Constructor.
236  # Size of the FIFO queue set to 10.
237  def __init__( self ): #, dict_stocks ) :
238  # init baseclass first
239  threading.Thread.__init__(self, name='queue_thread')
240 
241  self.logger = logging.getLogger("SP.AutoParser.Queue")
242  self.logger.info('init thread queue')
243 
244  #print "init thread_handle_queue"
245  # control for stopping the run loop
246  self._running=True
247 
248  # init a FIFO queue of size max size 10
249  self._queue_jobs=Queue.Queue( 10 )
250 
251  # not necessary, only to dispatch oactive/inactive in init
252  # Copy of an internal list of stock symbols loaded in DictionaryStocks
253  # maybe even not necessary, convenient, but may need to be updated !
254  #self._list_stocks = self.dict_stocks.get_stock_keys()
255  #self._list_stocks = DictionaryStocks().get_stock_keys()
256 
257  # initialise only for the check_open in init
258  update_stock = UpdateStocks( 'InstValue' )
259  # will write the first value in check_open(), only if the stock is OPEN
260  update_stock.set_write_to_file(True)
261 
262  # make local reference to the singleton object, maybe not too bad...used akk the time
263  #self.dict_stocks = dict_stocks
264 
265  self.logger.info("list_stocks: %s" % update_stock.get_stock_keys()) #self._list_stocks)
266 
267  ## @brief Important list, a stock should only belongs to one of theses lists !
268  # considered as active: 5 minutes before opening and until closed is detected
269  ## list of temporarly closed stocks.
270  self.list_inactive_stock=[]
271  ## list of open ( or wait_open ) stocks.
272  self.list_active_stock=[]
273 
274  # list of stock which cannot be parsed, to move to Stock, only exception can make them move to inactive
275  # but must be done by threads
276  #self.list_error_stock=[]
277 
278  # initialize the list_active / inactive / error stocks
279  # here do not use thread ( how to get back the answer ? by an other queue !!
280  # here again error could be associated to a stock : error_parser, error_X
281  #for symbol in self._list_stocks:
282  for symbol in update_stock.get_stock_keys():
283  # check if 'state' = OPEN or CLOSE
284  # may set 'state' = ERROR if cannot parse
285 
286  #if self.dict_stocks.get_stocks( symbol).check_open():
287  if update_stock.check_open( symbol ):
288  #print "check open running..."
289  self.list_active_stock.append(symbol)
290 
291  else:
292  self.list_inactive_stock.append(symbol)
293 
294  # to initialize the error state
295  #else:
296  #if self.dict_stocks.dictstocks[stock]['error'] == 0 :
297  # self.list_inactive_stock.append(stock)
298  #else :
299  #self.list_error_stock.append(stock)
300 
301  all_lines = update_stock.print_dict_stocks('InstValue')
302  self.logger.debug("\n%s",all_lines)
303  #time.sleep(5)
304 
305  ## @brief Contains the threads who will listen to the queue.
306  # possibility to add an other queue to receive results of the threads...
307  # lock problem ?? when update, should not be the same threads
308  self._list_thread_parser=[]
309  ## Create the parsers and append to _list_thread_parser.
310  self.create_threads_parser(NB_THREADS_PARSER)
311 
312  #if GlobalDicts.DEBUG_MODE :
313  print "init handler_queue"
314  print "list_active_stock"
315  print self.list_active_stock
316  print "list_inactive_stock"
317  print self.list_inactive_stock
318  #print "list_error_stock"
319  #print self.list_error_stock
320  self.logger.info("list active %s" % ([stock for stock in self.list_active_stock]) )
321  self.logger.info("list inactive %s" % ([stock for stock in self.list_inactive_stock]) )
322 
323  # test
324  def __del__(self):
325  print "\ndestructor handler_queue\n"
326 
327  #print "list_thread before deletion"
328  #for t in self.list_thread_parser:
329  # print t
330  ##stop first trhaed_parser
331  #for t in self.list_thread_parser:
332  ##t.__update=False
333  #del self.list_thread_parser[0]
334  #print "sleep(10)"
335  #time.sleep(10)
336  #del self.list_thread_parser[0]
337  #time.sleep(10)
338  #print "list_thread"
339  #for t in self.list_thread_parser:
340  # print t
341 
342  ## @brief Create and start the threads, called at initialisation.
343  # Threads set as deamon, stored self.list_thread_parser (should check about problem with deamon)
344  # @param nb_threads_parser number of threads to create (NB_THREADS_PARSER)
345  def create_threads_parser(self, nb_threads_parser):
346 
347  for i in range(nb_threads_parser):
348  name_thread = "t_parser-" + str(i)
349  t = thread_parser( self._queue_jobs, name_thread )
350  self._list_thread_parser.append(t)
351  # main does not stop if deleted
352  t.setDaemon(True)
353  t.start()
354 
355  ## @brief Fill the queue every TIMEPARSER sec.
356  # Run until self._running is set to False
357  # Check and change the state CLOSED, WAIT_OPEN of the Stocks.
358  def run(self):
359 
360  print ""
361  print "thread_handle_queue run() will update every ", TIMEPARSER, " sec."
362  print ""
363 
364  while self._running:
365 
366  # loop over inactive stocks, check if they will soon open
367  for symbol in self.list_inactive_stock:
368 
369  # maybe should keep a link, or ask for the list ?
370  obj_stock = DictionaryStocks().get_stocks( symbol )
371 
372  # if state error should not add directly, but should check before with CheckOpen, or GetInstValue
373  # better create list_error_stock...
374  #if DictStocks[stock]['error']
375 
376  # get time of opening
377  time_to_open = obj_stock.time_to_open()
378  self.logger.debug("time_to_open %s %d" % (symbol,time_to_open.seconds))
379 
380  # test will open in less than 5 minutes-> send to list_active_stock
381  # if (time_to_open.days==0) and (time_to_open.seconds < 300):
382  if (time_to_open.days==0) and (time_to_open.seconds < 3600): #for test 1 hour
383 
384  # move to active_list
385  self.list_inactive_stock.remove(symbol)
386  self.list_active_stock.append(symbol)
387 
388  # set as 'WAIT_OPEN' in the queue, will stay to WAIT_OPEN with update
389  obj_stock.state = 'WAIT_OPEN'
390  self.logger.info("CLOSED->WAIT_OPEN, %s will open in %d seconds" % (symbol,time_to_open.seconds))
391 
392  # DO NOT insert to the queue, next loop of list_active_stock will do it
393 
394  # loop over active stock, check if they need to be rerun
395  for symbol in self.list_active_stock:
396 
397  obj_stock = DictionaryStocks().get_stocks( symbol )
398 
399  # still open, put again in the queue
400  # need specific test for ML
401  if obj_stock.state == 'OPEN':
402  #print "found OPEN symbol ", obj_stock.symbol
403  #print 'id ', id(obj_stock)
404 
405 # To move to Stock as well
406  # Particularity ML on Bourso, Closed until 15h, but stay open after until ???18h ??
407  # Test if state='OPEN' and date=today(), work has been done
408  if obj_stock.get_market() == 'ml_euro':
409 
410  # if true work has been done for today, to put as inactive
411  if obj_stock.test_ML_done():
412  #print "ML Market already done"
413  self.list_active_stock.remove(symbol)
414  self.list_inactive_stock.append(symbol)
415  # may forced to close as well, here better than parser
416  self.logger.info("ML market %s done, move to inactive", stock)
417 
418  # all the other wait to be closed, send into the queue for a next round
419  else:
420  self.logger.debug("put stock, action %s %s" % (symbol,"InstValue"))
421  self._queue_jobs.put ( (symbol,"InstValue") )
422 
423  elif obj_stock.state == 'WAIT_OPEN':
424  self.logger.debug("WAIT_OPEN insert to the queue")
425  self._queue_jobs.put ( (symbol,"InstValue") )
426 
427 
428  # Possible only if it was OPEN before (or WAIT_OPEN, not anymore) and seen closed at the last update
429  # move into inactive stock
430  elif obj_stock.state == 'CLOSED':
431  self.list_active_stock.remove(symbol)
432  self.list_inactive_stock.append(symbol)
433  self.logger.info("%s OPEN->CLOSED, moved to inactive_stock", symbol)
434 
435  # no special loop for list_error_stock
436  # need function to retest on RELOAD for instance
437  # or function to retest depending on code error value
438 
439  #print "list active stock "
440  #print self.list_active_stock
441  #print "list_inactive_stock"
442  #print self.list_inactive_stock
443 
444  # use a elapsed time of X sec. to make the next loop
445  time.sleep ( TIMEPARSER )
446 
447 # called by Server option FORCE, but how to get the return ? certainly a test...
448 # Ok, found the point is to add a new stock in the queue, then updated normally, can call GET
449 # certainly for a method ADD_QUEUE
450 
451  ## @brief Function to force an update, insert directly the stock in the queue.
452  # Called by the Server ?\n
453  # Implement simple lock than the caller must check before returning ?
454  # @param stock name of the stock
455  # @param action only InstValue implemented
456  def InsertToQueue( self, stock, action ):
457  #print "InsertToQueue to put in the queue ", stock, action
458  action = "InstValue"
459 
460  self.logger.info("%s Force InsertToQueue", stock)
461 
462  # first must test it is correct
463  obj_stock = DictionaryStocks().get_stocks( stock )
464 
465  # may return an Invalid Stock
466  if obj_stock.is_valid():
467 
468  # insert to the queue, but did not insert in the list of active/inactive stock
469  # self._queue_jobs.put((stock,action))
470 
471  # insert to the list, dangerous into the queue AND the list
472  if (DictionaryStocks().get_stocks( stock ).state == 'OPEN') | (DictionaryStocks().get_stocks( stock ).state == 'WAIT_OPEN'):
473  self.list_active_stock.append( stock )
474  elif DictionaryStocks().get_stocks( stock ).state == 'CLOSED':
475  self.list_inactive_stock.append( stock )
476 
477  print "Insert To Queue new list active ", self.list_active_stock
478  print "Insert To Queue new list inactive ", self.list_inactive_stock
479  return True
480  else:
481  self.logger.info("InvalidStock discarded")
482  return False
483 
484  #Create a manual lock
485  #self.dict_stocks.dictstocks[stock]['lock'] = True
486  #if GlobalDicts.DEBUG_MODE:
487  # print 'lock stock ', stock, self.dict_stocks.dictstocks[stock]['lock']
488 
489 # ############# Main ################################
490 # to add, test when deleting queue, threads....
491 
492 if __name__ == "__main__":
493 
494  # if commented, will appear in doxygen documentation, see usage of EXCLUDE_SYMBOL
495  # http://stackoverflow.com/questions/4306133/how-to-exclude-variables-from-doxygen-output
496 
497  logging.basicConfig(level=logging.DEBUG)
498  # logging.basicConfig(filename='myapp.log', level=logging.INFO) # filemode='w' to not append to file
499  logger = logging.getLogger('SP')
500  main_logger = logging.getLogger("SP.main")
501 
502  # Load the dictionary and the configuration file
503  dict_stocks = DictionaryStocks( load_all_stocks=True )
504  #dict_stocks = DictionaryStocks()
505  # not a nice way to load a few stock, but it is not the point anyway
506  #dict_stocks.get_stocks(['CAC40','FSLR','EURUS'])
507 
508  time.sleep(2)
509 
510  print "FIRST: threading.enumerate ",threading.enumerate()
511  # run a thread for the queue
512  # to suppress ? to see level of interaction
513  tqh=Thread_Handle_Queue() # dict_stocks )
514  print "tqh.name ", tqh.name, " is Alive:", tqh.is_alive(), " is daemon:", tqh.daemon
515  time.sleep(5)
516  tqh.start()
517  # Alive after start()
518  print "tqh.name ", tqh.name, " is Alive:", tqh.is_alive(), " is daemon:", tqh.daemon
519 
520  time.sleep(20)
521  # print "dict_stocks in main"
522  # print dict_stocks
523 
524  print "SECOND: enumerate ",threading.enumerate()
525 
526  time.sleep(25)
527  # test for reloading dictstocks with a new value
528  # dict_stocks.ReadStocks('dictstocks.add1.txt')
529  # print dict_stocks
530 
531  # test force update...
532 
533  tqh._running=False
534 
535  # should try to set flag and join the thread here
536  del tqh
537 
538  time.sleep(3)
539  print "LAST : enumerate ",threading.enumerate()
540 
_update_stock_th
internal reference to the singleton DictionaryStocks should be an access to UpdateStocks now...
Definition: AutoParser.py:68
Define class UpdateStocks and ParserFactory.
Definition: UpdateStocks.py:1
def InsertToQueue
Function to force an update, insert directly the stock in the queue.
Definition: AutoParser.py:456
def __del__
Contains the threads who will listen to the queue.
Definition: AutoParser.py:324
def run
Consume a stock and an action to perform form the queue Action implemented : InstValue To extend to ...
Definition: AutoParser.py:86
def __init__
Constructor for one thread_parser.
Definition: AutoParser.py:53
_queue_jobs
queue to read the jobs to do, what to put ? a Stock or the symbol ?
Definition: AutoParser.py:62
Independent thread parsing the web pages, consuming data from the queue.
Definition: AutoParser.py:44
_update
use to stop the threads __prefix with name class, safer or stupid ! needed ?? no just wait something ...
Definition: AutoParser.py:77
Container of all Stocks objects, it also reads the static stocks configuration file "dictstocks...
Global variables for configuration: paths, TCP ports and generic definitions.
Definition: GlobalDicts.py:1
def create_threads_parser
Create and start the threads, called at initialisation.
Definition: AutoParser.py:345
Update data of a list of Stock's (only one action by call is possible).
Definition: UpdateStocks.py:45
def run
Fill the queue every TIMEPARSER sec.
Definition: AutoParser.py:358
Thread to handle the queueing system and create the thread_parsers.
Definition: AutoParser.py:233
Define singleton class DictionaryStocks, act as the main container of Stocks objects.