Jupter-Lab Startegy creator Scrap data Explore database

Low Level Socket

A way for flexible, reliable and performant interaction between application...




'SOCK_STREAM' via TCP/IP protocol is not a new protocol or 'bling-bling technology', but it's really flexible and powerfull !
This is also why, I suppose, the (wonderfull ?) default python logging module, propose a snippet based on this protocol ( official documentation ), the socketserver module and Length-Serilized-Prefixed Messages :



    import pickle
    import logging
    import logging.handlers
    import socketserver
    import struct
    
    
    class LogRecordStreamHandler(socketserver.StreamRequestHandler):
        """Handler for a streaming logging request.
    
        This basically logs the record using whatever logging policy is
        configured locally.
        """
    
        def handle(self):
            """
            Handle multiple requests - each expected to be a 4-byte length,
            followed by the LogRecord in pickle format. Logs the record
            according to whatever policy is configured locally.
            """
            while True:
                chunk = self.connection.recv(4)
                if len(chunk) < 4:
                    break
                slen = struct.unpack('>L', chunk)[0]
                chunk = self.connection.recv(slen)
                while len(chunk) < slen:
                    chunk = chunk + self.connection.recv(slen - len(chunk))
                obj = self.unPickle(chunk)
                record = logging.makeLogRecord(obj)
                self.handleLogRecord(record)
    
        def unPickle(self, data):
            return pickle.loads(data)
    
        def handleLogRecord(self, record):
            # if a name is specified, we use the named logger rather than the one
            # implied by the record.
            if self.server.logname is not None:
                name = self.server.logname
            else:
                name = record.name
            logger = logging.getLogger(name)
            # N.B. EVERY record gets logged. This is because Logger.handle
            # is normally called AFTER logger-level filtering. If you want
            # to do filtering, do it at the client end to save wasting
            # cycles and network bandwidth!
            logger.handle(record)
    
    class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
        """
        Simple TCP socket-based logging receiver suitable for testing.
        """
    
        allow_reuse_address = True
    
        def __init__(self, host='localhost',
                     port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
                     handler=LogRecordStreamHandler):
            socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
            self.abort = 0
            self.timeout = 1
            self.logname = None
    
        def serve_until_stopped(self):
            import select
            abort = 0
            while not abort:
                rd, wr, ex = select.select([self.socket.fileno()],
                                           [], [],
                                           self.timeout)
                if rd:
                    self.handle_request()
                abort = self.abort
    
    def main():
        logging.basicConfig(
            format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
        tcpserver = LogRecordSocketReceiver()
        print('About to start TCP server...')
        tcpserver.serve_until_stopped()
    
    if __name__ == '__main__':
        main()
  

The class below can instanciate a socket object that let the sending of message beetween 2 python processes of any type (asyncio process, threaded process, mutliprocess process...)

*trust me :)


    #!/usr/bin/env python
    # coding:utf-8

    import socket

    class MySocketObj(socket.socket):
    Name = "MySocketObj"
    def __init__(self, name:str=None, timeout:int=None):
        super().__init__(socket.AF_INET, socket.SOCK_STREAM, socket.SOL_TCP)
        if not name is None:
            self.Name = name
        self.context = None
        self.pickle = pickle
        self.struct = struct
        self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if not timeout is None: 
            self.settimeout(timeout)
    def make_connection(self, server:str="127.0.0.1", port:int=54321):
        if server != "127.0.0.1" and server.lower() != "localhost":
            self.context = SSL_client_context()
            self.context.wrap_socket(self, server_hostname=server)
            self.connect((server, int(port)))
        else:   
            self.connect((server, int(port)))
        if self.Name is None:
            self.Name = "sock_{0}".format(self.getsockname()[1])
        return self
    def send_data(self, data):
        serialized_data = self.pickle.dumps(data)
        self.sendall(self.struct.pack('>L', len(serialized_data)))
        self.sendall(serialized_data)
    def receive_data(self):       
        chunk = self.recv(4)
        if len(chunk) < 4:
            return False
        slen = self.struct.unpack('>L', chunk)[0]
        chunk = self.recv(slen)
        while len(chunk) < slen:
            chunk = chunk + self.recv(slen - len(chunk))
        return self.pickle.loads(chunk)
    def __enter__(self):
        return self
    def __exit__(self, *args):
        if not self._closed:
            self.close()

    from asyncio import open_connection as asyncioOpen_connection
    class MyAsyncSocketObj:
        Name = "MyAsyncSocketObj"
        def __init__(self, name=None):
            if not name is None:
                self.Name = name
            self.reader = None
            self.writer = None
            self.sock_info = None
            self.pickle = pickle
            self.struct = struct
        async def make_connection(self, server:str="127.0.0.1", port:int=54321):
            self.host = server
            self.port = port
            if self.writer is None:
                self.reader, self.writer = await asyncioOpen_connection(server, port)
                self.sock_info = (self.writer.get_extra_info('socket')).getsockname()
            return self
        async def send_data(self, data):
            serialized_data = self.pickle.dumps(data)
            self.writer.write(self.struct.pack('>L', len(serialized_data)))
            self.writer.write(serialized_data)
            await self.writer.drain()
        async def receive_data(self):
            chunk = await self.reader.read(4)
            if len(chunk) < 4:
                return False
            slen = self.struct.unpack('>L', chunk)[0]
            chunk = await self.reader.read(slen)
            while len(chunk) < slen:
                chunk = chunk + await self.reader.read(slen - len(chunk))
            return self.pickle.loads(chunk)
        async def close_connection(self):
            if not self.writer is None:
                self.writer.close()
                await self.writer.wait_closed()
            self.reader = None
            self.writer = None
        async def __aenter__(self):
            return await self.make_connection(self.host, self.port)
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            await self.close_connection()
  

With this technic, communication can also be done through network (internally and externally) :



    # in the class above a context is set to the object, if communication is not done through internal loop (127.0.0.1)

    #   def make_connection(self, server:str="127.0.0.1", port:int=54321):
    #   if server != "127.0.0.1" and server.lower() != "localhost":
    #       self.context = SSL_client_context()
    #       self.context.wrap_socket(self, server_hostname=server)
    #       self.connect((server, int(port)))
    #   else:   
    #       self.connect((server, int(port)))
    #   if self.Name is None:
    #       self.Name = "sock_{0}".format(self.getsockname()[1])
    #   return self

    with MySocketObj().make_connection(server=self.config.NT_IP, port=int(self.config.NT_PORT)) as NotifsSocket:
        if not NotifsSocket.context is None:
            self.Logger.info("Notifs client SSL : connection established with encryption: '{0}' to '{1}' destport '{2}' srcport '{3}'".format(NotifsSocket.version(), self.config.NT_URI, self.config.NT_PORT, NotifsSocket.getsockname()[1]))
        else:
            self.Logger.info("Notifs client TCP : connection established to '{0}' destport '{1}' srcport '{2}'".format(self.config.NT_IP, self.config.NT_PORT, NotifsSocket.getsockname()[1]))

    

And it can also be used with other programming langage or script, until the protocol and the message object dump match to the 'specification'...

**This is why many other python, C/C++, C# (...) elaborated module/framework/system messaging/distibuted calculation (...), used deeply and extensively 'TCP/IP SOCK_STREAM' protocol !