Zero copy solution ; langage, operating system and machine agnostic
Universal and efficient message exchange
Exchanging data between process written in different programming langage is unfortunately, not so easy and even something really technical while working with different operating system and machines.
Fortunately, there is some awesome "universal" solutions for this kind of problems : Protocol Buffers and Cap'n Proto.
Protocol Buffers is well known and offer incredible level of abstraction, simplicity and efficiency with RPC calls.
In the other hand Cap'n Proto, propose something really interesting, "zero copy" thank to 0 encoding/decoding ! (see website for more info)
Cap'n Proto can generate a file, containing a class with method to "format" and "interact" with byte message, for most of the current langage used nowaday ;
The generate class is based on flat simple file (*.capnp) with the data definition, which is the same for every langage, machine or OS (*except for some cases).
The bytes messages can be then exchange with other programming langage, and used as this, without high level complex and expensive serialisation/deserialisation...
Message definition, example of a capnp file for "logger message" :
#// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\
# // ! \\ No underscore in label field name !!! // ! \\
# // ! \\ field name camelCase // ! \\
# // ! \\ field number start = 0 // ! \\
# // ! \\ enum number start = position // ! \\
#// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\// ! \\
# generate notifMsg ncap proto
# ncap install is required :
# go get -u -t zombiezen.com/go/capnproto2
# GO111MODULE=off go get -u capnproto.org/go/capnp/v3/
# cd "/users/Toto/Desktop/govenv/api/capnp/LoggerMsg"
# capnp compile -I "/users/Toto/Desktop/govenv/api/capnp/loggerMsg/go-capnp/std" -ogo logger.capnp
#using Go = import "/go.capnp";
@0xfc867d59c4f2e15c;
#$Go.package("MyLogger");
#$Go.import("govenv/pkg/common/MyLogger");
enum Level {
notset @0;
debug @1;
stream @2;
info @3;
logon @4;
logout @5;
trade @6;
schedule @7;
report @8;
warning @9;
error @10;
critical @11;
}
struct LoggerMsg {
# // recorded by log_server :
timestamp @0 :Text;
hostname @1 :Text;
loggerName @2 :Text;
module @3 :Text;
level @4 :Level;
filename @5 :Text;
functionName @6 :Text;
lineNumber @7 :Text;
message @8 :Text;
# // others
# // path of the file
pathName @9 :Text;
# // Process information
processId @10 :Text;
processName @11 :Text;
# // Thread information
threadId @12 :Text;
threadName @13 :Text;
# // Additional requested fields
serviceName @14 :Text;
# // Optional stack trace for errors
stackTrace @15 :Text;
}
Samples of the generated external class for Rust, that handle "logger message" :
// @generated by the capnpc-rust plugin to the Cap'n Proto schema compiler.
// DO NOT EDIT.
// source: api/capnp/loggerMsg/logger.capnp
#[repr(u16)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Level {
Notset = 0,
Debug = 1,
Stream = 2,
Info = 3,
Logon = 4,
Logout = 5,
Trade = 6,
Schedule = 7,
Report = 8,
Warning = 9,
Error = 10,
Critical = 11,
}
impl ::capnp::introspect::Introspect for Level {
fn introspect() -> ::capnp::introspect::Type { ::capnp::introspect::TypeVariant::Enum(::capnp::introspect::RawEnumSchema { encoded_node: &level::ENCODED_NODE, annotation_types: level::get_annotation_types }).into() }
}
impl ::core::convert::From for ::capnp::dynamic_value::Reader<'_> {
fn from(e: Level) -> Self { ::capnp::dynamic_value::Enum::new(e.into(), ::capnp::introspect::RawEnumSchema { encoded_node: &level::ENCODED_NODE, annotation_types: level::get_annotation_types }.into()).into() }
}
impl ::core::convert::TryFrom for Level {
type Error = ::capnp::NotInSchema;
fn try_from(value: u16) -> ::core::result::Result>::Error> {
match value {
0 => ::core::result::Result::Ok(Self::Notset),
1 => ::core::result::Result::Ok(Self::Debug),
2 => ::core::result::Result::Ok(Self::Stream),
3 => ::core::result::Result::Ok(Self::Info),
4 => ::core::result::Result::Ok(Self::Logon),
//(...)
mod level {
pub static ENCODED_NODE: [::capnp::Word; 69] = [
::capnp::word(0, 0, 0, 0, 5, 0, 6, 0),
::capnp::word(43, 197, 135, 111, 61, 85, 196, 216),
::capnp::word(33, 0, 0, 0, 2, 0, 0, 0),
::capnp::word(92, 225, 242, 196, 89, 125, 134, 252),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(21, 0, 0, 0, 58, 1, 0, 0),
::capnp::word(37, 0, 0, 0, 7, 0, 0, 0),
];
//(...)
pub mod logger_msg {
#[derive(Copy, Clone)]
pub struct Owned(());
impl ::capnp::introspect::Introspect for Owned { fn introspect() -> ::capnp::introspect::Type { ::capnp::introspect::TypeVariant::Struct(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types, annotation_types: _private::get_annotation_types }).into() } }
impl ::capnp::traits::Owned for Owned { type Reader<'a> = Reader<'a>; type Builder<'a> = Builder<'a>; }
impl ::capnp::traits::OwnedStruct for Owned { type Reader<'a> = Reader<'a>; type Builder<'a> = Builder<'a>; }
impl ::capnp::traits::Pipelined for Owned { type Pipeline = Pipeline; }
pub struct Reader<'a> { reader: ::capnp::private::layout::StructReader<'a> }
impl <> ::core::marker::Copy for Reader<'_,> {}
impl <> ::core::clone::Clone for Reader<'_,> {
fn clone(&self) -> Self { *self }
}
//(...)
pub fn get_field_types(index: u16) -> ::capnp::introspect::Type {
match index {
0 => <::capnp::text::Owned as ::capnp::introspect::Introspect>::introspect(),
1 => <::capnp::text::Owned as ::capnp::introspect::Introspect>::introspect(),
2 => <::capnp::text::Owned as ::capnp::introspect::Introspect>::introspect(),
3 => <::capnp::text::Owned as ::capnp::introspect::Introspect>::introspect(),
4 => ::introspect(),
5 => <::capnp::text::Owned as ::capnp::introspect::Introspect>::introspect(),
//(...)
pub fn get_annotation_types(child_index: Option, index: u32) -> ::capnp::introspect::Type {
panic!("invalid annotation indices ({:?}, {}) ", child_index, index)
}
pub static RAW_SCHEMA: ::capnp::introspect::RawStructSchema = ::capnp::introspect::RawStructSchema {
encoded_node: &ENCODED_NODE,
nonunion_members: NONUNION_MEMBERS,
members_by_discriminant: MEMBERS_BY_DISCRIMINANT,
members_by_name: MEMBERS_BY_NAME,
};
pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15];
pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[];
pub static MEMBERS_BY_NAME : &[u16] = &[5,6,1,4,7,2,8,3,9,10,11,14,15,12,13,0];
pub const TYPE_ID: u64 = 0xa92b_aada_7e15_7478;
}
Example of a Rust TCP log server, that receive a "logger message" and write it to a file :
use capnp::{message::ReaderOptions, serialize_packed};
use std::{
collections::BTreeMap,
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
sync::Arc,
};
use tokio::{
fs::{self, File},
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
sync::mpsc,
task,
time::{sleep, Duration},
};
use tracing::{error, info};
mod safe_socket_minimal;
use safe_socket_minimal::SafeSocket;
const INITIAL_BATCH_SIZE: usize = 100;
const BUFFER_SIZE: usize = 1024;
const MAX_RETRIES: usize = 3;
const RETRY_DELAY_MS: u64 = 100;
const MAX_FILE_BYTES: u64 = 1 * 1024 * 1024; // 1 MB
// BACKUP_COUNT = BACKUP_COUNT value + 2 ... I'm too lazy to change it (as it won't normally change frequently)
const BACKUP_COUNT: usize = 10;
// tracing_subscriber::fmt().init(); => tracing facade : tracing subscriber can be added if needed...
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Logger messages
mod logger_capnp {
include!("logger_capnp.rs");
}
// match to logger capnp level
const LEVEL_STRINGS: [&str; 12] = [
"notset", "debug", "stream", "info", "logon", "logout", "trade", "schedule", "report",
"warning", "error", "critical",
];
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// create log folder if not exists
fn create_log_folder(path: &str) -> std::io::Result<()> {
if std::fs::metadata(path).is_err() {
std::fs::create_dir_all(path)?;
}
Ok(())
}
fn get_exec_parent_dir() -> PathBuf {
match std::env::current_exe() {
Ok(exe_path) => match exe_path.parent() {
Some(parent) => parent.to_path_buf(),
None => {
error!("Failed to get parent directory of the executable");
std::process::exit(1);
}
},
Err(e) => {
error!("Failed to get the executable path: {}", e);
std::process::exit(1);
}
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Main function
#[tokio::main]
async fn main() -> tokio::io::Result<()> {
tracing_subscriber::fmt().init();
create_log_folder("logs")?;
let log_file = get_exec_parent_dir().join("logs").join("_main.log");
let addr = "127.0.0.1:9020";
let listener = TcpListener::bind(addr).await?;
info!(message = "Server started", address = %addr);
let sequence_counter = Arc::new(AtomicU64::new(0));
let (writer_tx, writer_rx) = mpsc::channel::<(u64, String)>(BUFFER_SIZE);
task::spawn(writer_task(
writer_rx,
log_file.clone(),
MAX_FILE_BYTES,
BACKUP_COUNT,
));
loop {
let (socket, addr) = listener.accept().await?;
let writer_tx = writer_tx.clone();
let sequence_counter = sequence_counter.clone();
tokio::spawn(async move {
if let Err(e) = handle_tcp_client(socket, writer_tx, sequence_counter).await {
error!(message = "Error handling client", client_address = %addr, error = %e);
}
});
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Handle incoming client connections
async fn handle_tcp_client(
socket: TcpStream,
writer_tx: mpsc::Sender<(u64, String)>,
sequence_counter: Arc,
) -> tokio::io::Result<()> {
let mut logger_sock = SafeSocket::new(socket);
loop {
let bytes_read = logger_sock.receive_data().await?;
if bytes_read.is_none() {
break;
}
let data = bytes_read.unwrap();
// Deserialize Cap'n Proto message
match serialize_packed::read_message(&mut &data[..], ReaderOptions::new()) {
Ok(reader) => {
let log_message = reader
.get_root::()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
// Extract and format each log writed field
let timestamp = log_message.get_timestamp().unwrap().to_str().unwrap();
let hostname = log_message.get_hostname().unwrap().to_str().unwrap();
let logger_name = log_message.get_logger_name().unwrap().to_str().unwrap();
let level = LEVEL_STRINGS[log_message.get_level().unwrap() as usize];
let filename = log_message.get_filename().unwrap().to_str().unwrap();
let function_name = log_message.get_function_name().unwrap().to_str().unwrap();
let line_number = log_message.get_line_number().unwrap().to_str().unwrap();
let message = log_message.get_message().unwrap().to_str().unwrap();
// Log message to stdout and file system
let fs_log_message = format!(
"{} {} {} {} {} {} {} {}",
timestamp,
hostname,
logger_name,
level,
filename,
function_name,
line_number,
message
);
//info!(fs_log_message);
// Forward the string message for file storage
writer_tx
.send((
sequence_counter.fetch_add(1, Ordering::SeqCst),
fs_log_message,
))
.await
.unwrap();
}
Err(e) => {
error!(message = "Failed to deserialize message", error = %e);
break;
}
}
}
Ok(())
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// async function to write messages in log file (ordered by clients)
pub async fn writer_task(
mut rx: mpsc::Receiver<(u64, String)>,
base_file_path: PathBuf,
max_bytes: u64,
backup_count: usize,
) -> tokio::io::Result<()> {
let mut file = File::create(&base_file_path).await?;
let mut file_size = 0u64;
let mut buffer: BTreeMap = BTreeMap::new();
let mut current_sequence: u64 = 0;
let mut batch_size = INITIAL_BATCH_SIZE;
while let Some((sequence, data)) = rx.recv().await {
// Insert messages in order
buffer.insert(sequence, data);
while buffer.len() >= batch_size || buffer.contains_key(¤t_sequence) {
let mut batch = Vec::new();
for _ in 0..batch_size {
if let Some(data) = buffer.remove(¤t_sequence) {
batch.push(data);
current_sequence += 1;
} else {
break;
}
}
if !batch.is_empty() {
for attempt in 0..=MAX_RETRIES {
let mut success = true;
for data in &batch {
let log_entry = format!("{}\n", data);
if file.write_all(log_entry.as_bytes()).await.is_err() {
success = false;
break;
}
file_size += data.len() as u64;
}
if success {
break;
} else if attempt < MAX_RETRIES {
sleep(Duration::from_millis(RETRY_DELAY_MS)).await;
} else {
return Err(tokio::io::Error::new(
tokio::io::ErrorKind::Other,
"Write failed after maximum retries",
));
}
}
// Rotate file if size exceeds limit
if file_size >= max_bytes {
file.flush().await?;
rotate_files(&base_file_path, backup_count).await?;
file = File::create(&base_file_path).await?;
file_size = 0;
}
file.flush().await?;
}
}
// Adjust batch size dynamically
if buffer.len() > batch_size {
batch_size = (batch_size * 2).min(1000);
} else if buffer.len() < batch_size / 2 {
batch_size = (batch_size / 2).max(10);
}
}
// Flush remaining messages in the buffer
for (_, data) in buffer {
let log_entry = format!("{}\n", data);
file.write_all(log_entry.as_bytes()).await?;
}
file.flush().await?;
Ok(())
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Async function to rotate log files
async fn rotate_files(base_path: &PathBuf, backup_count: usize) -> tokio::io::Result<()> {
for i in (1..=backup_count).rev() {
let old_path = base_path.with_extension(format!("log.{}", i - 1));
let new_path = base_path.with_extension(format!("log.{}", i));
if fs::metadata(&old_path).await.is_ok() {
fs::rename(&old_path, &new_path).await?;
}
}
fs::rename(&base_path, &base_path.with_extension("log.0")).await?;
Ok(())
}
Example of a Python client that send this kind of message to the log server :
import capnp
import socket
import datetime
from os import getcwd as osGetcwd
from os.path import join as osPathJoin
data_msg = capnp.load(osPathJoin(osGetcwd(), "api", "capnp", "loggerMsg",'logger.capnp'))
# Load the Cap'n Proto schema
import struct
class SafeSocket():
Name = "safeSocket"
def __init__(self, conn, name=None, serializer_class=None):
if not name is None: self.Name = name
self.struct = struct
self.conn = conn
def send_data(self, serialized_data):
self.conn.sendall(self.struct.pack('>L', len(serialized_data)))
self.conn.sendall(serialized_data)
def receive_data(self):
chunk = self.conn.recv(4)
if len(chunk) < 4:
return False
slen = self.struct.unpack('>L', chunk)[0]
chunk = self.conn.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.conn.recv(slen - len(chunk))
return self.deSerialize(chunk)
def __enter__(self):
return self
def __exit__(self, *args):
if not self.conn._closed:
self.conn.close()
def send_capnp_message():
# Create a Cap'n Proto message
from datetime import datetime
logger_msg = data_msg.LoggerMsg.new_message()
logger_msg.timestamp = str(datetime.utcnow()) #"2025-01-28T12:34:56Z" #string // When the event occurred
logger_msg.hostname = "youpi" #string // Host/machine name
logger_msg.loggerName = "LoggerName" #string // Name of the logger (usually __name__)
logger_msg.module = "Module" #string // Module (name portion of filename)
# the proto capnp used the proto format for enum (0, 1, 2, 3, 4, 5)
logger_msg.level = data_msg.Level.warning #= log_level_to_int[Level]/10
logger_msg.filename = "Filename" #string // Filename portion of pathname
logger_msg.functionName = "FunctionName" #string // Function name
logger_msg.lineNumber = "LineNumber" #string // Source line number
logger_msg.message = "Message" #string // The log message
#logger_msg.relativeCreated = "RelativeCreated" #string // difference between logging triggered and loggerMessage creation
logger_msg.pathName = "PathName" #string // Full pathname of the source file
logger_msg.processId = "ProcessId" #string // Process ID
logger_msg.processName = "ProcessName" #string // Process name
logger_msg.threadId = "ThreadId" #string // Thread ID
logger_msg.threadName = "ThreadName" #string // Thread name
logger_msg.serviceName = "ServiceName" #string // Name of the service generating the log
logger_msg.stackTrace = "StackTrace" #string // Stack trace if available
# Serialize the message
return logger_msg.to_bytes_packed()
if __name__ == "__main__":
# Connect to Rust server
with socket.create_connection(("127.0.0.1", 9020)) as sock:
logSock = SafeSocket(sock)
while True:
serialized_message = send_capnp_message()
logSock.send_data(serialized_message)
That's all..