TA-Lib
Easy way to get technical analysis...
TA-Lib is a free library that can calculate some common technical indicators with current OHLCV (Open, High, Low, Close, Volume) datas.
It also exist a python lib based on this : ta-lib-python ; that lib can be used to easely handle all technical datas and find patterns on many "tickers"...
Below a simple class that can be used to get indicators from one ticker data :
#!/usr/bin/env python
# coding:utf-8
from numpy import empty as npEmpty, roll as npRoll
from talib import stream, get_function_groups as talibGet_function_groups
from talib.abstract import Function as TA_libFunction
class TA_RealTime:
"""
optional init named parameters :
- config : config
- logger : logger
- return_result : True / False, return result after calculation
- buffer_size : size of the buffer
- indicators : list of indicators [MACD,ICHIKOMU,...]
- QPush : asyncioQueue to share results in another parent class
"""
Name = "TA-RealTime"
QPush = None
config = None
logger = None
def __init__(self, name, **kwargs):
self.Name = "{1}-{0}".format(self.Name, name)
self.ticker = name
self._RT_indicators = {} ; self._indicator_params = {} ; self._infos = {} ; self.df_like = {}
if "config" in kwargs: self.config = kwargs["config"]
if "logger" in kwargs: self.logger = kwargs["logger"]
self.return_result = kwargs["return_result"] if "return_result" in kwargs else True
self.ndArray_size = kwargs["buffer_size"] if "buffer_size" in kwargs else 200
if "QPush" in kwargs: self.QPush = kwargs["QPush"]
self._load_indicators(kwargs["indicators"]) if "indicators" in kwargs else self._load_indicators()
self.ndArray = npEmpty(shape=(6, self.ndArray_size), dtype=float) ; self.ndArray_index = -1 ; self.num_records = 0
self._latest = {}
def _C_Buffer(self, data):
# circular buffer
if self.num_records > self.ndArray_size-1:
self.ndArray[0, self.ndArray_index] = data[0]
self.ndArray[1, self.ndArray_index] = data[1]
self.ndArray[2, self.ndArray_index] = data[2]
self.ndArray[3, self.ndArray_index] = data[3]
self.ndArray[4, self.ndArray_index] = data[4]
self.ndArray[5, self.ndArray_index] = data[5]
self.ndArray_index = (self.ndArray_index + 1) % self.ndArray_size
else:
self.ndArray[0, self.ndArray_index] = data[0]
self.ndArray[1, self.ndArray_index] = data[1]
self.ndArray[2, self.ndArray_index] = data[2]
self.ndArray[3, self.ndArray_index] = data[3]
self.ndArray[4, self.ndArray_index] = data[4]
self.ndArray[5, self.ndArray_index] = data[5]
self.ndArray_index = (self.ndArray_index + 1)
self.num_records += 1
def _RT_calculate_indicators(self):
# start calculation if buffer full
if self.num_records > self.ndArray_size-1:
self.df_like['price'] = npRoll(self.ndArray[0], -self.ndArray_index)
self.df_like['open'] = npRoll(self.ndArray[1], -self.ndArray_index)
self.df_like['high'] = npRoll(self.ndArray[2], -self.ndArray_index)
self.df_like['low'] = npRoll(self.ndArray[3], -self.ndArray_index)
self.df_like['close'] = npRoll(self.ndArray[4], -self.ndArray_index)
self.df_like['volume'] = npRoll(self.ndArray[5], -self.ndArray_index)
temp = {}
for name, func in self._RT_indicators.items():
params = self._indicator_params[name].get("prices") or self._indicator_params[name].get("price")
if type(params) == str : params = [params]
if name != "stream_MAVP" and name != "stream_BETA" and name != "stream_CORREL" and name != "stream_OBV":
if len(params) > 0:
value = func(*[self.df_like[param] for param in params])
else :
value = func()
self._latest[name] = value
temp[name] = value
if not self.QPush is None:
if len(temp) > 0:
self.QPush.put_nowait({self.ticker:temp})
else:
self.df_like['price'] = self.ndArray[0, 0:self.ndArray_index]
self.df_like['open'] = self.ndArray[1, 0:self.ndArray_index]
self.df_like['high'] = self.ndArray[2, 0:self.ndArray_index]
self.df_like['low'] = self.ndArray[3, 0:self.ndArray_index]
self.df_like['close'] = self.ndArray[4, 0:self.ndArray_index]
self.df_like['volume'] = self.ndArray[5, 0:self.ndArray_index]
def _load_indicators(self, indicators=None):
if not indicators is None:
for key, func_list in talibGet_function_groups().items():
if not key.startswith("Math"):
for name in func_list:
if name in indicators:
try:
stream_func = getattr(stream, name)
self._RT_indicators[stream_func.__name__] = stream_func
self._indicator_params[stream_func.__name__] = TA_libFunction(name).input_names
except Exception as e:
if not self.logger is None:
self.logger.error("{0} : error while trying to load real time indicator '{1}' : {2}".format(self.Name, name, e))
continue
else:
for key, func_list in talibGet_function_groups().items():
if not key.startswith("Math"):
for name in func_list:
try:
stream_func = getattr(stream, name)
self._RT_indicators[stream_func.__name__] = stream_func
self._indicator_params[stream_func.__name__] = TA_libFunction(name).input_names
except Exception as e:
if not self.logger is None:
self.logger.error("{0} : error while trying to load real time indicator '{1}' : {2}".format(self.Name, name, e))
continue
# can or should be overloaded in child class
async def add_async_data(self, data, sock_requester):
self._C_Buffer(data)
self._RT_calculate_indicators()
if self.return_result:
return await sock_requester.send_data(self._latest)
# can or should be overloaded in child class
def add_data(self, data):
self._C_Buffer(data)
self._RT_calculate_indicators()
if self.return_result:
return self._latest
This class above, can be used in a TCP server for the tracking of multiple "tickers" at almost real time, or a strategy class with one tickers to get the required indicators even more quickly...
#!/usr/bin/env python
# coding:utf-8
from asyncio import run as asyncioRun, Lock as asyncioLock, start_server as asyncioStart_server
# relative import
from sys import path;path.extend("..")
from common.Helpers.helpers import init_logger
from common.Helpers.network_helpers import SafeAsyncSocket
from analyst.Analysts.base.ta_base import TA_RealTime
config = None
logger = None
asyncLock = asyncioLock()
TA_RealTime_List = {}
async def process_data(deserialized_data, sock_requester):
global asyncLock ; global TA_RealTime_List
async with asyncLock:
key = deserialized_data.pop(0)
if key.lower().startswith("get|"):
broTick = key.split('|')[1] ; indic = "all"
if "|" in broTick: broTick, indic = broTick.split('|')
try:
current_TA_obj = TA_RealTime_List[broTick]
except:
sock_requester.send_data("TA indicators for {0} currently not calculated...".format(broTick))
pass
try:
if indic == "all":
sock_requester.send_data(current_TA_obj._latest)
else:
sock_requester.send_data(current_TA_obj._latest[indic])
except Exception as e:
sock_requester.send_data("error while trying to get indicator '{0}' data for {1} : {2}".format(indic, broTick, e))
else:
try:
current_TA_obj = TA_RealTime_List[key]
except:
current_TA_obj = TA_RealTime(name=key, config=config, logger=logger)
TA_RealTime_List[key] = current_TA_obj
await current_TA_obj.add_async_data(deserialized_data, sock_requester)
########################################################################
# Async Tcp server
async def handle_TCP_client(reader, writer):
asyncSock = SafeAsyncSocket(reader=reader, writer=writer)
data = await asyncSock.receive_data()
if not data:
asyncSock.writer.close()
await asyncSock.writer.wait_closed()
return
clientName, host, port = data.split(':') ; port = int(port)
await logger.asyncInfo("{0} : '{1}' has established connection without encryption from '{2}' destport '{3}'".format(name, clientName, host, port))
while True:
data = await asyncSock.receive_data()
if not data:
break
await process_data(deserialized_data=data, sock_requester=asyncSock)
asyncSock.writer.close()
await asyncSock.writer.wait_closed()
async def start_tcp_server():
global async_tcp_server
try:
async_TCP_IP = config.parser["REALTIMETA"]["RT_TA_SERVER"] ; async_TCP_port = int(config.parser["REALTIMETA"]["RT_TA_PORT"])
async_tcp_server = await asyncioStart_server(handle_TCP_client, async_TCP_IP, async_TCP_port)
await logger.asyncInfo("{0} async TCP : socket async TCP handler is open : {1}, srcport {2}".format(name, async_TCP_IP, async_TCP_port))
await async_tcp_server.serve_forever()
except KeyboardInterrupt:
pass
except Exception as e:
await logger.asyncError("{0} : error while trying to start TCP server : '{1}'".format(name, e))
exit(1)
#================================================================
if __name__ == "__main__":
from sys import argv
CONFIGFILE = "analyst"
from os.path import basename as osPathBasename
name = (osPathBasename(__file__)).split('.')[0]
if len(argv) == 2:
name = argv[1]
name = name.lower()
# loading config and logger
config, logger = init_logger(name=name, config=CONFIGFILE)
mem_section = name.upper()
# start TCP Server
asyncioRun(start_tcp_server())