Low Level Socket
A way for flexible, reliable and performant interaction between application...
'SOCK_STREAM' via TCP/IP protocol is not a new protocol or 'bling-bling technology', but it's really flexible and powerfull !
This is also why, I suppose, the (wonderfull ?) default python logging module, propose a snippet based on this protocol (
official documentation ), the
socketserver module and
Length-Serilized-Prefixed Messages :
import pickle
import logging
import logging.handlers
import socketserver
import struct
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.
This basically logs the record using whatever logging policy is
configured locally.
"""
def handle(self):
"""
Handle multiple requests - each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data):
return pickle.loads(data)
def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)
class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""
allow_reuse_address = True
def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None
def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
self.handle_request()
abort = self.abort
def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server...')
tcpserver.serve_until_stopped()
if __name__ == '__main__':
main()
The class below can instanciate a socket object that let the sending of message beetween 2 python processes of any type (asyncio process, threaded process, mutliprocess process...)
*trust me :)
#!/usr/bin/env python
# coding:utf-8
import socket
class MySocketObj(socket.socket):
Name = "MySocketObj"
def __init__(self, name:str=None, timeout:int=None):
super().__init__(socket.AF_INET, socket.SOCK_STREAM, socket.SOL_TCP)
if not name is None:
self.Name = name
self.context = None
self.pickle = pickle
self.struct = struct
self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if not timeout is None:
self.settimeout(timeout)
def make_connection(self, server:str="127.0.0.1", port:int=54321):
if server != "127.0.0.1" and server.lower() != "localhost":
self.context = SSL_client_context()
self.context.wrap_socket(self, server_hostname=server)
self.connect((server, int(port)))
else:
self.connect((server, int(port)))
if self.Name is None:
self.Name = "sock_{0}".format(self.getsockname()[1])
return self
def send_data(self, data):
serialized_data = self.pickle.dumps(data)
self.sendall(self.struct.pack('>L', len(serialized_data)))
self.sendall(serialized_data)
def receive_data(self):
chunk = self.recv(4)
if len(chunk) < 4:
return False
slen = self.struct.unpack('>L', chunk)[0]
chunk = self.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.recv(slen - len(chunk))
return self.pickle.loads(chunk)
def __enter__(self):
return self
def __exit__(self, *args):
if not self._closed:
self.close()
from asyncio import open_connection as asyncioOpen_connection
class MyAsyncSocketObj:
Name = "MyAsyncSocketObj"
def __init__(self, name=None):
if not name is None:
self.Name = name
self.reader = None
self.writer = None
self.sock_info = None
self.pickle = pickle
self.struct = struct
async def make_connection(self, server:str="127.0.0.1", port:int=54321):
self.host = server
self.port = port
if self.writer is None:
self.reader, self.writer = await asyncioOpen_connection(server, port)
self.sock_info = (self.writer.get_extra_info('socket')).getsockname()
return self
async def send_data(self, data):
serialized_data = self.pickle.dumps(data)
self.writer.write(self.struct.pack('>L', len(serialized_data)))
self.writer.write(serialized_data)
await self.writer.drain()
async def receive_data(self):
chunk = await self.reader.read(4)
if len(chunk) < 4:
return False
slen = self.struct.unpack('>L', chunk)[0]
chunk = await self.reader.read(slen)
while len(chunk) < slen:
chunk = chunk + await self.reader.read(slen - len(chunk))
return self.pickle.loads(chunk)
async def close_connection(self):
if not self.writer is None:
self.writer.close()
await self.writer.wait_closed()
self.reader = None
self.writer = None
async def __aenter__(self):
return await self.make_connection(self.host, self.port)
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close_connection()
With this technic, communication can also be done through network (internally and externally) :
# in the class above a context is set to the object, if communication is not done through internal loop (127.0.0.1)
# def make_connection(self, server:str="127.0.0.1", port:int=54321):
# if server != "127.0.0.1" and server.lower() != "localhost":
# self.context = SSL_client_context()
# self.context.wrap_socket(self, server_hostname=server)
# self.connect((server, int(port)))
# else:
# self.connect((server, int(port)))
# if self.Name is None:
# self.Name = "sock_{0}".format(self.getsockname()[1])
# return self
with MySocketObj().make_connection(server=self.config.NT_IP, port=int(self.config.NT_PORT)) as NotifsSocket:
if not NotifsSocket.context is None:
self.Logger.info("Notifs client SSL : connection established with encryption: '{0}' to '{1}' destport '{2}' srcport '{3}'".format(NotifsSocket.version(), self.config.NT_URI, self.config.NT_PORT, NotifsSocket.getsockname()[1]))
else:
self.Logger.info("Notifs client TCP : connection established to '{0}' destport '{1}' srcport '{2}'".format(self.config.NT_IP, self.config.NT_PORT, NotifsSocket.getsockname()[1]))
And it can also be used with other programming langage or script, until the protocol and the message object dump match to the 'specification'...
**This is why many other python, C/C++, C# (...) elaborated module/framework/system messaging/distibuted calculation (...), used deeply and extensively 'TCP/IP SOCK_STREAM' protocol !