Jupter-Lab Startegy creator Scrap data Explore database

TA-Lib

Easy way to get technical analysis...




TA-Lib is a free library that can calculate some common technical indicators with current OHLCV (Open, High, Low, Close, Volume) datas.
It also exist a python lib based on this : ta-lib-python ; that lib can be used to easely handle all technical datas and find patterns on many "tickers"...
Below a simple class that can be used to get indicators from one ticker data :


    #!/usr/bin/env python
    # coding:utf-8
    
    from numpy import empty as npEmpty, roll as npRoll
    from talib import stream, get_function_groups as talibGet_function_groups
    from talib.abstract import Function as TA_libFunction
    
    
    class TA_RealTime:
        """
            optional init named parameters : 
                    -  config : config
                    -  logger : logger
                    -  return_result : True / False, return result after calculation
                    -  buffer_size : size of the buffer
                    -  indicators : list of indicators [MACD,ICHIKOMU,...]
                    -  QPush : asyncioQueue to share results in another parent class
        """ 
        Name = "TA-RealTime"
        QPush = None
        config = None
        logger = None
        def __init__(self, name, **kwargs):
            self.Name = "{1}-{0}".format(self.Name, name)
            self.ticker = name
            self._RT_indicators = {} ; self._indicator_params = {} ; self._infos = {} ; self.df_like = {}
    
            if "config" in kwargs: self.config = kwargs["config"] 
            if "logger" in kwargs: self.logger = kwargs["logger"] 
            
            self.return_result = kwargs["return_result"] if "return_result" in kwargs else True
            self.ndArray_size = kwargs["buffer_size"] if "buffer_size" in kwargs else 200
    
            if "QPush" in kwargs: self.QPush = kwargs["QPush"] 
            self._load_indicators(kwargs["indicators"]) if "indicators" in kwargs else self._load_indicators()
    
            self.ndArray = npEmpty(shape=(6, self.ndArray_size), dtype=float) ; self.ndArray_index = -1 ; self.num_records = 0
            self._latest = {}  
    
        def _C_Buffer(self, data):
            # circular buffer
            if self.num_records > self.ndArray_size-1:
                self.ndArray[0, self.ndArray_index] = data[0]
                self.ndArray[1, self.ndArray_index] = data[1]
                self.ndArray[2, self.ndArray_index] = data[2]
                self.ndArray[3, self.ndArray_index] = data[3]
                self.ndArray[4, self.ndArray_index] = data[4]
                self.ndArray[5, self.ndArray_index] = data[5]
                self.ndArray_index = (self.ndArray_index + 1) % self.ndArray_size
            else:
                self.ndArray[0, self.ndArray_index] = data[0]
                self.ndArray[1, self.ndArray_index] = data[1]
                self.ndArray[2, self.ndArray_index] = data[2]
                self.ndArray[3, self.ndArray_index] = data[3]
                self.ndArray[4, self.ndArray_index] = data[4]
                self.ndArray[5, self.ndArray_index] = data[5]
                self.ndArray_index = (self.ndArray_index + 1)
                self.num_records += 1
    
        def _RT_calculate_indicators(self):
            # start calculation if buffer full
            if self.num_records > self.ndArray_size-1:
                self.df_like['price'] = npRoll(self.ndArray[0], -self.ndArray_index)
                self.df_like['open'] = npRoll(self.ndArray[1], -self.ndArray_index)
                self.df_like['high'] = npRoll(self.ndArray[2], -self.ndArray_index)
                self.df_like['low'] = npRoll(self.ndArray[3], -self.ndArray_index)
                self.df_like['close'] = npRoll(self.ndArray[4], -self.ndArray_index)
                self.df_like['volume'] = npRoll(self.ndArray[5], -self.ndArray_index)
    
                temp = {}
                for name, func in self._RT_indicators.items():
                    params = self._indicator_params[name].get("prices") or self._indicator_params[name].get("price") 
                    if type(params) == str : params = [params]
                    if name != "stream_MAVP" and name != "stream_BETA" and name != "stream_CORREL" and name != "stream_OBV":
                        if len(params) > 0:
                            value = func(*[self.df_like[param] for param in params])
                        else : 
                            value = func()
                        self._latest[name] = value
                        temp[name] = value
                if not self.QPush is None:
                    if len(temp) > 0:
                        self.QPush.put_nowait({self.ticker:temp})
            else:
                self.df_like['price'] = self.ndArray[0, 0:self.ndArray_index]
                self.df_like['open'] = self.ndArray[1, 0:self.ndArray_index]
                self.df_like['high'] = self.ndArray[2, 0:self.ndArray_index]
                self.df_like['low'] = self.ndArray[3, 0:self.ndArray_index]
                self.df_like['close'] = self.ndArray[4, 0:self.ndArray_index]
                self.df_like['volume'] = self.ndArray[5, 0:self.ndArray_index]
    
        def _load_indicators(self, indicators=None):
            if not indicators is None:
                for key, func_list in talibGet_function_groups().items():
                    if not key.startswith("Math"):
                        for name in func_list:
                            if name in indicators:
                                try:
                                    stream_func = getattr(stream, name)
                                    self._RT_indicators[stream_func.__name__] = stream_func
                                    self._indicator_params[stream_func.__name__] = TA_libFunction(name).input_names
                                except Exception as e:
                                    if not self.logger is None:
                                        self.logger.error("{0} : error while trying to load real time indicator '{1}' : {2}".format(self.Name, name, e))
                                    continue
                
            else:
                for key, func_list in talibGet_function_groups().items():
                    if not key.startswith("Math"):
                        for name in func_list:
                            try:
                                stream_func = getattr(stream, name)
                                self._RT_indicators[stream_func.__name__] = stream_func
                                self._indicator_params[stream_func.__name__] = TA_libFunction(name).input_names
                            except Exception as e:
                                if not self.logger is None:
                                    self.logger.error("{0} : error while trying to load real time indicator '{1}' : {2}".format(self.Name, name, e))
                                continue
        
        # can or should be overloaded in child class
        async def add_async_data(self, data, sock_requester):
            self._C_Buffer(data)
            self._RT_calculate_indicators()
            if self.return_result:
                return await sock_requester.send_data(self._latest)
            
        # can or should be overloaded in child class
        def add_data(self, data):
            self._C_Buffer(data)
            self._RT_calculate_indicators()
            if self.return_result: 
                return self._latest    
  

This class above, can be used in a TCP server for the tracking of multiple "tickers" at almost real time, or a strategy class with one tickers to get the required indicators even more quickly...


    #!/usr/bin/env python
    # coding:utf-8 
    
    from asyncio import run as asyncioRun, Lock as asyncioLock, start_server as asyncioStart_server
    
    # relative import
    from sys import path;path.extend("..")
    from common.Helpers.helpers import init_logger
    from common.Helpers.network_helpers import SafeAsyncSocket
    from analyst.Analysts.base.ta_base import TA_RealTime
    
    config = None
    logger = None
    asyncLock = asyncioLock()
    TA_RealTime_List = {}
    
    async def process_data(deserialized_data, sock_requester):
        global asyncLock ; global TA_RealTime_List
        async with asyncLock:
            key = deserialized_data.pop(0)
    
            if key.lower().startswith("get|"):
                broTick = key.split('|')[1] ; indic = "all"
                if "|" in broTick: broTick, indic = broTick.split('|')
                try:
                    current_TA_obj = TA_RealTime_List[broTick]
                except:
                    sock_requester.send_data("TA indicators for {0} currently not calculated...".format(broTick))
                    pass
                try:
                    if indic == "all": 
                        sock_requester.send_data(current_TA_obj._latest)
                    else: 
                        sock_requester.send_data(current_TA_obj._latest[indic])
                except Exception as e:
                    sock_requester.send_data("error while trying to get indicator '{0}' data for {1} : {2}".format(indic, broTick, e))
    
            else:
                try:
                    current_TA_obj = TA_RealTime_List[key]
                except:
                    current_TA_obj = TA_RealTime(name=key, config=config, logger=logger)
                    TA_RealTime_List[key] = current_TA_obj
                await current_TA_obj.add_async_data(deserialized_data, sock_requester)
    
    ########################################################################
    # Async Tcp server
    async def handle_TCP_client(reader, writer):
        asyncSock = SafeAsyncSocket(reader=reader, writer=writer)
        data = await asyncSock.receive_data()
        if not data:
            asyncSock.writer.close()
            await asyncSock.writer.wait_closed()
            return
        clientName, host, port = data.split(':') ; port = int(port)      
        await logger.asyncInfo("{0} : '{1}' has established connection without encryption from '{2}' destport '{3}'".format(name, clientName, host, port))
        while True:
            data = await asyncSock.receive_data()
            if not data:
                break
            await process_data(deserialized_data=data, sock_requester=asyncSock)
        asyncSock.writer.close()
        await asyncSock.writer.wait_closed()
    
    async def start_tcp_server():
        global async_tcp_server
        try:
            async_TCP_IP = config.parser["REALTIMETA"]["RT_TA_SERVER"] ; async_TCP_port = int(config.parser["REALTIMETA"]["RT_TA_PORT"])
            async_tcp_server = await asyncioStart_server(handle_TCP_client, async_TCP_IP, async_TCP_port)
            await logger.asyncInfo("{0} async TCP : socket async TCP handler is open : {1}, srcport {2}".format(name, async_TCP_IP, async_TCP_port))
            await async_tcp_server.serve_forever()
        except KeyboardInterrupt:
            pass 
        except Exception as e:
            await logger.asyncError("{0} : error while trying to start TCP server : '{1}'".format(name, e))
            exit(1)
    
    
    #================================================================
    if __name__ == "__main__":
        from sys import argv
        CONFIGFILE = "analyst"
    
        from os.path import basename as osPathBasename
        name = (osPathBasename(__file__)).split('.')[0]
    
        if len(argv) == 2: 
            name = argv[1]
        name = name.lower()
    
        # loading config and logger
        config, logger = init_logger(name=name, config=CONFIGFILE)
        mem_section = name.upper()
    
        # start TCP Server
        asyncioRun(start_tcp_server())