Jupter-Lab Startegy creator Scrap data Explore database

Local Queue

Inter-process communication inside one machine



Not relevant anymore... as I won't use only python programming language

Will be replaced by TCP socket communication and Google protobuf/Cap'n proto for some processes and Aeron.io for others (with a TCP entry point for external sub/pub)



By using internal local loop (127.0.0.1 IP address) of a machine, TCP protocol and socket can be used build coherent services or functionalities "pool".

To speed up and facilitate the building of those coherent services, a Message Queue framework is often used : Kafka, ZeroMq, RabbitMQ...
As these frameworks are often heavy, required many configuration, expertise and are not always really customizable (or with lot of work).
I find interesting to make one simple Message Queuing that fits to my needs...

This simple message queue only used the power of Asyncio Socket Server (asyncio bypass one weakness of python : GIL) By default this message queue has a batch of specific processes (not shown here) and is very modular... With this way of doing, no need of special framework library, only TCP socket with correct message format, all programing language will works without nothing more...
Interoperability is also as good as http protocol, but faster and more efficient than http !



    #!/usr/bin/env python
    # coding:utf-8
    
    
    from os.path import dirname as osPathDirname, join as osPathJoin
    from uuid import uuid4
    from collections import deque
    from asyncio import Lock as asyncioLock, Queue as asyncioQueue, run as asyncioRun, all_tasks as asyncioAll_tasks, sleep as asyncioSleep, \
                        start_server as asyncioStart_server, get_event_loop as asyncioGet_event_loop, wrap_future as asyncioWrap_future
    from flask import Flask, request, render_template, jsonify
    
    
    # relative import
    from sys import path;path.extend("..")
    from common.ThreadQs.routing_rules import LostQmsg
    from common.TeleRemote.tele_command import ATelecommand
    from common.Helpers.helpers import Singleton, init_logger, getUnusedPort
    from common.Helpers.os_helpers import start_independant_process
    from common.Helpers.network_helpers import SafeAsyncSocket
    #from trading.trading_helpers import refresh_trading_component
    
    
    DEFAULT_RETRY_PERIOD = 1
    
    asyncLock = asyncioLock()
    
    class Qmsg:
        def __init__(self, msg, frome, too, ackw=None, priority=False):
            self.id = uuid4()
            self.msg = msg
            self.frome = frome
            self.too = too
            self.ackw = ackw
            self.priority = priority
    
    
    ########################################################################################################################################
    
    class aQSocket():
        """
        class to simulate socket for incoming message to aMatrixQ 
        or managing Subs tasks in aStarQ... 
        """
        def __init__(self, process_my_message, forward_message):
            self.aMatrixQueue = asyncioQueue()
            self.process_my_message = process_my_message
            self.forward_message = forward_message
        async def send_data(self, data):
            await self.aMatrixQueue.put(data)
        async def received_data(self):
            while True:
                data = await self.aMatrixQueue.get()
                msg = await self.process_my_message(data)
                await self.forward_message(msg)
                self.aMatrixQueue.task_done()
    
    
    class AStarQ(ATelecommand, metaclass=Singleton):
        """ 
        For AStarQ.subs
        *args should contains tuples of tuples :
            (  
                ("name1", Func1_ptr | "module.func1", childProc=False), 
                ("name2", Class2_ptr | "module.class2", childProc=True)
            )
        For subs (func or class) params
        **kwargs should contains a dict of params for the task or childproc
            {   
                name1: { "name1":{"func1_named_arg":"arg1_1"} }, 
                name2: { "name2":{"class2_named_arg":"arg2_1"} }
            }
        """
        Name = "AStarQ"
        asyncLoop = None
        def __init__(self, config, logger, asyncLoop=None, name:str=None, host:str="127.0.0.1", port:int=int(getUnusedPort()), autorun=True, *args, **kwargs):
            if not name is None:
                self.Name = name
            self.config = config
            self.logger = logger
            self.host = host ; self.port = port
            self.clients = {} # traders
            self.asyncLoop = None
            teleAsyncHook = None
            self.state = "inited"   
    
            # async loop
            if not asyncLoop is None: self.asyncLoop = asyncLoop
            else: self.asyncLoop = asyncioGet_event_loop()
    
            # telecommand
            self.TeleBufQ = deque()
    
            #for arg in args:
            #    self.subs(arg, kwargs)
              
            if autorun:
                self.run_forever()
    
        def run_forever(self):
            # telecommand
            self.teleAsyncHook = self.asyncLoop.create_task(self.TeleCommand())
            # Tcp part
            self.asyncLoop.create_task(self.run_TCP_server(), name="run_TCP_server")
            # monitoring
            doNotWaitThread = self.asyncLoop.run_in_executor(None, self.start_webserver, self.Name)
            _ = asyncioWrap_future(doNotWaitThread)
            # run_forever
            self.asyncLoop.run_forever()
    
        ####################################################################
        # AStarQ methods
        async def _register_subs(self, name, writer):
            global asyncLock
            async with asyncLock:
                self.clients[name] = writer
                
        def _get_subs(self, TaskOrChildProc, childproc=False):
            if type(TaskOrChildProc) == "str":
                if childproc:
                    return globals()["info_module"].get(TaskOrChildProc)
                else:
                    return globals().get(TaskOrChildProc)
            else: 
                return TaskOrChildProc
        # @refresh_trading_component
        async def Subs(self, name, subs, childProc=False, **kwargs):
            if not childProc: 
                # local subs
                try:
                    cur_kwargs = kwargs.pop(name)
                except:
                    cur_kwargs = {}
                subs = self._get_subs(TaskOrChildProc=subs)
                self.asyncLoop.create_task(subs(**cur_kwargs), name=name)
                await self._register_subs(name=name, writer=subs)
            else:
                # process subs
                try:
                    args = kwargs.pop(name) ; cur_kwargs = {"host":self.host, "port":self.port} | args
                except: 
                    cur_kwargs = {"host":self.host, "port":self.port} 
                childProcCmd = self._get_subs(TaskOrChildProc=subs, childproc=True)
                args=()
                for arg in cur_kwargs.values(): args += (arg,) 
                start_independant_process(childProcCmd, argv=args)
            
        async def _unregister_subs(self, name):
            global asyncLock
            async with asyncLock:
                self.clients.pop(name)
        async def un_Subs(self, name):
            await self._unregister_subs(name=name)
            subsTask = None
            for task in asyncioAll_tasks():
                if task.get_name() == name:
                    subsTask = task
                    break
            if not subsTask is None:
                # local subs
                subsTask.cancel()
                await subsTask
            else :
                # also unregister while exiting TCP connection, in handle_TCP_client... 
                self.get_response(Qmsg(msg=False, too=name, frome=self.Name))
    
        ####################################################################
        # message transfert part
        async def get_response(self, msg):
            global asyncLock
            async with asyncLock:
                try:
                    writer = self.clients[msg.too].writer
                except Exception as e:
                    LostQmsg(msg)
                    await self.logger.asyncError("{0} : error while trying to get response message : {1}".format(self.Name, e))
            return await writer.send_data(msg)
        
        async def forward_message(self, msg):
            global asyncLock
            async with asyncLock:
                try:
                    writer = self.clients[msg.too].writer
                except Exception as e:
                    LostQmsg(msg) 
                    await self.logger.asyncError("{0} : error while trying to forward message : {1}".format(self.Name, e))     
            await writer.send_data(msg)
    
        ####################################################################
        # aStarQ heart
        async def handle_TCP_client(reader, writer, aStarQ):
            asyncSock = SafeAsyncSocket(reader=reader, writer=writer)
            data = await asyncSock.receive_data()
            if not data:
                asyncSock.writer.close()
                await asyncSock.writer.wait_closed()
                return
            clientName, host, port = data.split(':') ; port = int(port)
            aStarQ._register_subs(name=clientName, writer=writer)
            await aStarQ.logger.asyncInfo("{0} : '{1}' has established connection without encryption from '{2}' destport '{3}'".format(aStarQ.Name, clientName, host, port))
            while True:
                data = await asyncSock.receive_data()
                if not data:
                    break
                if data.priority:
                    response = await aStarQ.get_response(msg=data)
                else:
                    await aStarQ.forward_message(msg=data)
                    response = None
                if not response is None:
                    asyncSock.send_data(response)
                    await asyncSock.writer.drain()
                    await aStarQ.logger.asyncInfo("{0} async TCP : response '{1}' send to '{2}'".format(aStarQ.Name, response, clientName))
            asyncSock.writer.close()
            await asyncSock.writer.wait_closed()
            await aStarQ._unregister_subs(name=clientName)
    
        async def run_TCP_server(self):
            self.async_tcp_server = await asyncioStart_server(lambda r, w: self.handle_TCP_client(r, w, self), self.host, self.port)
            await self.logger.asyncInfo("{0} async TCP : socket async TCP handler is open : {1}, srcport {2}".format(self.Name, self.host, self.port))
            await self.async_tcp_server.serve_forever()
    
        #####################################################################
        ## Telecommand part
        def get_asyncLock(self):
            global asyncLock
            return asyncLock
    
        #####################################################################    
        ## webServer view
        def start_webserver(self, name, host_port=int(getUnusedPort())):
            try :
                web_template_folder = osPathJoin(osPathDirname(__file__), "templates")
                web_static_folder = osPathJoin(osPathDirname(__file__), "static")
    
                app = Flask(name, template_folder=web_template_folder, static_folder=web_static_folder)
                #self.webserver = host_port
                app.config['SERVER_NAME'] = "127.0.0.1:{0}".format(host_port)
                self.logger.info("{0} : flask web server is available here : http://{1}:{2}".format(self.Name, self.host, host_port))
    
                # main http endpoint
                @app.route('/')
                def index():
                    return render_template('test.html')
    
                app.run()
            except Exception as e:
                self.logger.critical("{0} : error while trying to start web interface : {1}".format(name.capitalize(), e))
                enabled = False
                exit(1)
    
    #================================================================
    if __name__ == "__main__":
        configStr = "current"
        config, logger = init_logger(name=AStarQ.Name.lower(), config=configStr) 
        # for  testing purposes
        mainQ = AStarQ(config=config, logger=logger, autorun=False)
        asyncioRun(mainQ.run_forever())

  

Messages can be "route" with specifc rules :


    #!/usr/bin/env python
    # coding:utf-8
    
    from os import getcwd as osGetcwd, mkdir as osMkdir
    from os.path import join as ospathJoin
    from genericpath import exists
    
    # relative import
    from sys import path;path.extend("..")
    from common.Helpers.helpers import getOrDefault, threadIt
    
    LOST_QMSG_DIR = ospathJoin(osGetcwd(), "logs", "lost")
    
    @threadIt
    def LostQmsg(Qmsg):
        global LOST_QMSG_DIR
        if not exists(LOST_QMSG_DIR):
            osMkdir(LOST_QMSG_DIR)
        with open(ospathJoin(LOST_QMSG_DIR, "{0}.Qmsg".format(Qmsg.id)), 'w') as file:
            file.write(str(Qmsg.__dict__))
    
    def starQ_route_msg(self, cur_msg, QueuesOut):#, logger:MyLogger):
        try :
            QueuesOut[cur_msg.too].append(cur_msg)
            self.logger.debug("{0} : routing_rules, sent message id '{1}' from '{2}' to '{3}'".format(self.Name, cur_msg.id, cur_msg.frome, cur_msg.too))
        except Exception as e:
            self.logger.critical("{0} : routing_rules, error while trying to route message id '{0}' in Threads Queues '{1}' : {2}".format(self.Name, getOrDefault(cur_msg.id, "#None"), cur_msg.too, e))
            if not (cur_msg.too in QueuesOut):
                self.logger.critical("{0} : routing_rules, threads Queue '{1}' doesn't exist in subscribers StarQs...".format(self.Name, cur_msg.too))
    
            self.logger.critical("{0} : routing_rules, message id '{0}' has been lost !!!".format(self.Name, getOrDefault(cur_msg.id, "#None")))
            LostQmsg(cur_msg)
    
    #================================================================
    if __name__ == "__main__":
        pass
  

Next step ? see distributed framework... (with SSH and shared objects between machine)