Distributed Framework
Inter Local Queue communication and object sharing over network...
Not relevant anymore... as I won't use only python programming language
Will be replaced by TCP socket communication and Google protobuf/Cap'n proto for some processes and Aeron.io for others (with a TCP entry point for external sub/pub)
I have check some distributed python frameworks, I finally find "RPyC" really intersting, but the more I learnt how it works, the more I was thinking that I should make my own simple one...
#!/usr/bin/env python
# coding:utf-8
from os import getpid as osGetpid, getcwd as osGetcwd
from os.path import join as osPathjoin
from sys import argv
from uuid import uuid4
from collections import deque
from asyncio import run as asyncioRun, start_server as asyncioStart_server, \
Lock as asyncioLock, get_event_loop as asyncioGet_event_loop
# relative import
from sys import path;path.extend("..")
from common.ThreadQs.routing_rules import LostQmsg
from common.ThreadQs.aStarQ import aQSocket
from common.TeleRemote.tele_command import ATelecommand
from common.Helpers.helpers import Singleton, init_logger, getOrDefault
from common.Helpers.os_helpers import is_process_running, launch_monitoring_server, launch_watch_dog_server
from common.Helpers.network_helpers import SafeAsyncSocket, SSL_server_context, get_my_public_ip
#from trading.trading_helpers import refresh_trading_component
###########################################
# IP_INTERNAL is FIXED and HARD_CODED #
# IP_EXTERNAL is FIXED and HARD_CODED #
###########################################
# This variables are set in environ variable also
AMATRIXQ_IP_INTERNAL = '127.0.0.1'
AMATRIXQ_IP_EXTERNAL = '0.0.0.0'
# This variables are set in environ variable also
AMATRIXQ_NAME = "AMatrixQ"
DEFAULT_INTERNAL_PORT = 9001
DEFAULT_EXTERNAL_PORT = 9002
asyncLock = asyncioLock()
class Qmsg:
def __init__(self, msg, frome, too, ackw=False, priority=False):
self.id = uuid4()
self.msg = msg
self.frome = frome
self.too = too
self.ackw = ackw
self.priority = priority
########################################################################################################################################
class Peer:
def __init__(self, name, ip, port, writer, cmdLine=None, logPath=None, pid=None):
self.name = name
self.ip = ip
self.port = port
self.writer = writer
self.cmdLine = cmdLine
self.logPath = logPath
self.pid = pid
def __call__(self, *args, **kwargs):
for x in args:
self.__dict__.update({x:None})
self.__dict__.update(kwargs)
class AMatrixQ(ATelecommand, metaclass=Singleton):
Name = AMATRIXQ_NAME
Peers = {}
LocalPeers = {}
LocalMonitored = {}
clients = {} # traders
asyncLoop = None
def __init__(self, config, logger, name:str=None, asyncLoop=None, internal_port:int=DEFAULT_INTERNAL_PORT, external_port:int=DEFAULT_EXTERNAL_PORT, autorun=True, *args, **kwargs):
if not name is None:
self.Name = name
self.config = config
self.logger = logger
self.watch_dog = None
self.teleAsyncHook = None
self.state = "inited"
# async loop
if not asyncLoop is None: self.asyncLoop = asyncLoop
else: self.asyncLoop = asyncioGet_event_loop()
# internal connection server
self.async_tcp_server = None
self.internal_port = internal_port
# external connection server
self.async_tcp_ssl_server = None
self.external_port = external_port
# telecommand
self.TeleBufQ = deque()
if autorun:
self.run_forever(internal_port, external_port)
####################################################################
# Run servers forever
def run_forever(self, internal_port=DEFAULT_INTERNAL_PORT, external_port=DEFAULT_EXTERNAL_PORT):
# telecommand part
self.teleAsyncHook = self.asyncLoop.create_task(self.TeleCommand())
# manage message to aMatrixQ
self.MatrixQSocket = aQSocket(forward_message=self.forward_message, process_my_message=self.process_my_message)
# internal communication
internal_TCP_server = self.asyncLoop.create_task(self.run_TCP_server(host=AMATRIXQ_IP_INTERNAL, port=internal_port))
# external communication
external_TCP_SSL_server = self.asyncLoop.create_task(self.run_TCP_SSL_server(host=AMATRIXQ_IP_EXTERNAL, port=external_port))
# run aMatrixQ socket
self.asyncLoop.create_task(self._register_subs(name=self.Name, ip="_._._._", port=-1, writer=self.MatrixQSocket, cmdLine=" ".join(argv), logPath=osPathjoin(osGetcwd(),"logs", "{0}.log".format(self.Name.lower())), pid=osGetpid()))
self.asyncLoop.create_task(self.MatrixQSocket.received_data())
# watch_dog
self.asyncLoop.create_task(self.start_watch_dog())
# monitoring if not already runs...
self.asyncLoop.create_task(self.start_monitoring())
# run_forever
self.asyncLoop.run_forever()
####################################################################
# AMatrixQ methods
async def _register_subs(self, name, ip, port, writer, cmdLine=None, logPath=None, pid=None):
global asyncLock
async with asyncLock:
self.Peers[name] = Peer(name=name, ip=ip, port=port, writer=writer, cmdLine=cmdLine, logPath=logPath, pid=pid)
if ip == "127.0.0.1":
self.LocalPeers[name] = self.Peers[name]
async def _unregister_subs(self, name):
global asyncLock
async with asyncLock:
self.Peers.pop(name)
if name in self.LocalPeers:
self.LocalPeers.pop(name)
# manage messages coming to aMatrixQ
async def process_my_message(self, data):
#if data.msg==...
message="coucou"
return Qmsg(msg=message, frome=self.Name, too=data.frome)
####################################################################
# message transfert part
async def get_response(self, msg):
global asyncLock
async with asyncLock:
try:
writer = self.Peers[msg.too].writer
except Exception as e:
LostQmsg(msg)
await self.logger.asyncError("{0} : error while trying to get response message : {1}".format(self.Name, e))
return await writer.send_data(msg)
async def forward_message(self, msg):
global asyncLock
async with asyncLock:
try:
writer = self.Peers[msg.too].writer
except Exception as e:
LostQmsg(msg)
await self.logger.asyncError("{0} : error while trying to forward message : {1}".format(self.Name, e))
await writer.send_data(msg)
####################################################################
# internal TCP communication
@staticmethod
async def handle_TCP_client(reader, writer, iAMatrixQ):
asyncSock = SafeAsyncSocket(reader=reader, writer=writer)
data = await asyncSock.receive_data()
if not data:
asyncSock.writer.close()
await asyncSock.writer.wait_closed()
return
clientName, host, port, cmdLine, logPath, pid = data.split(':') ; port = int(port)
await iAMatrixQ._register_subs(name=clientName, ip=host, port=port, writer=asyncSock, cmdLine=cmdLine, logPath=logPath, pid=pid)
await iAMatrixQ.logger.asyncInfo("{0} : '{1}' has established connection without encryption from '{2}' destport '{3}'".format(iAMatrixQ.Name, clientName, host, port))
while True:
data = await asyncSock.receive_data()
if not data:
break
if data.ackw:
response = await iAMatrixQ.get_response(msg=data)
else:
await iAMatrixQ.forward_message(msg=data)
response = None
if not response is None:
asyncSock.send_data(response)
asyncSock.writer.close()
await asyncSock.writer.wait_closed()
await iAMatrixQ._unregister_subs(name=clientName)
async def run_TCP_server(self, host, port):
self.async_tcp_server = await asyncioStart_server(lambda reader, writer: AMatrixQ.handle_TCP_client(reader=reader, writer=writer, iAMatrixQ=self), host=host, port=port)
await self.logger.asyncInfo("{0} async TCP server : socket async TCP handler is open : {1}, srcport {2}".format(self.Name, host, port))
async with self.async_tcp_server:
await self.async_tcp_server.serve_forever()
####################################################################
# external TCP SSL communication
@staticmethod
async def handle_TCP_SSL_client(reader, writer, eAMatrixQ):
asyncSock = SafeAsyncSocket(reader=reader, writer=writer)
data = await asyncSock.receive_data()
if not data:
asyncSock.writer.close()
await asyncSock.writer.wait_closed()
return
clientName, host, port = data.split(':') ; port = int(port)
await eAMatrixQ._register_subs(name=clientName, ip=host, port=port, writer=asyncSock)
await eAMatrixQ.logger.asyncInfo("{0} : '{1}' has established connection with encryption '{2}' from '{3}' destport '{4}'".format(eAMatrixQ.Name, clientName, asyncSock.writer.transport.get_extra_info('ssl_object').version(), clientName, host, port))
while True:
data = await asyncSock.receive_data()
if not data:
break
if data.ackw:
response = await eAMatrixQ.get_response(msg=data)
else:
await eAMatrixQ.forward_message(msg=data)
response = None
if not response is None:
asyncSock.send_data(response)
asyncSock.writer.close()
await asyncSock.writer.wait_closed()
await eAMatrixQ._unregister_subs(name=clientName)
async def run_TCP_SSL_server(self, host, port):
from common.Helpers.network_helpers import SSL_test_context
self.async_tcp_ssl_server = await asyncioStart_server(lambda reader, writer: AMatrixQ.handle_TCP_SSL_client(reader=reader, writer=writer, eAMatrixQ=self), host=host, port=port, ssl=SSL_test_context())
await self.logger.asyncInfo("{0} async TCP SSL server : socket async TCP SSL handler is open : {1}, srcport {2}".format(self.Name, host, port))
async with self.async_tcp_ssl_server:
await self.async_tcp_ssl_server.serve_forever()
#####################################################################
# start watch_dog
async def start_watch_dog(self):
pass
#if not is_process_running(cmdlinePatt="watch_dog"):
# _ = launch_watch_dog_server(conf=self.config.COMMON_FILE_PATH)
# await self.logger.asyncInfo("{0} : watch server is starting.. . .".format(self.Name))
#self.watch_dog = get_watch_dog()
#####################################################################
# start monitoring
async def start_monitoring(self):
if not is_process_running(cmdlinePatt="monitoring"):
_ = launch_monitoring_server(conf=self.config.COMMON_FILE_PATH)
await self.logger.asyncInfo("{0} : main monitoring server is starting.. . .".format(self.Name))
#####################################################################
## Telecommand part
def get_asyncLock(self):
global asyncLock
return asyncLock
def telePortMe(self):
# do something
pass
#================================================================
if __name__ == "__main__":
from sys import argv
name = AMATRIXQ_NAME
configStr = "current"
name = name.lower()
config, logger = init_logger(name=name, config=configStr)
mainQ = AMatrixQ(config=config, logger=logger, autorun=False)
asyncioRun(mainQ.run_forever())
- Socket transfert seems fully working, deep tests have to be made...
- Network object sharing part has to be made...
- My custom socket object has to be overload with SSH connection...
- Crypted os environment variable container (inline json) has to be implemented at virtual env level, if possible...
# TODO