Base classes

ZmqClient

class libstored.ZmqClient(host: str = 'localhost', port: int = 19026, multi: bool = False, timeout: float | None = None, context: None | Context = None, t: str | None = None, use_state: str | None = None, *args, **kwargs)

Asynchronous ZMQ client.

This client can connect to both the libstored.zmq_server.ZmqServer and stored::DebugZmqLayer.

acquire_macro() str | None
acquire_macro(*, block: Literal[False]) Future[str | None]
acquire_macro(*, sync: Literal[True]) str | None
acquire_macro(*, block: Literal[False], sync: Literal[True]) Future[str | None]

Get a free macro name.

In case there is no available macro name, None is returned. This can still be used to create a Macro object, although it is not that efficient.

Arguments * block : bool = True: perform a blocking call

Result * str | None: the macro name when block = True * otherwise a future with this str | None

alias(obj: str | Object, prefer: str | None = None, temporary: bool = True, permanentRef: Any = None) str | None
alias(obj: str | Object, prefer: str | None = None, temporary: bool = True, permanentRef: Any = None, *, block: Literal[False]) Future[str | None]
alias(obj: str | Object, prefer: str | None = None, temporary: bool = True, permanentRef: Any = None, *, sync: Literal[True]) str | None
alias(obj: str | Object, prefer: str | None = None, temporary: bool = True, permanentRef: Any = None, *, block: Literal[False], sync: Literal[True]) Future[str | None]

Assign an alias to an object.

Arguments * obj : str | Object: the object to assign an alias to, or its name * prefer : str | None = None: the preferred alias, or None to get any available one * temporary : bool = True: if True, assign a temporary alias, otherwise a permanent one * permanentRef : Any = None: when assigning a permanent alias, a reference to the user of this alias.

Result * str | None: the assigned alias, or None when no alias could be assigned, when block = True * otherwise a future with this str | None

capabilities() str
capabilities(*, block: Literal[False]) Future[str]
capabilities(*, sync: Literal[True]) str
capabilities(*, block: Literal[False], sync: Literal[True]) Future[str]

Get the capabilities of the connected ZMQ server.

Arguments * block : bool = True: perform a blocking call

Result * str with the capabilities when block = True * otherwise a future

close() None
close(*, block: Literal[False]) Future[None]
close(*, sync: Literal[True]) None
close(*, block: Literal[False], sync: Literal[True]) Future[None]

Alias for disconnect().

connect(host: str | None = None, port: int | None = None, multi: bool | None = None, default_state: bool = False) None
connect(host: str | None = None, port: int | None = None, multi: bool | None = None, default_state: bool = False, *, block: Literal[False]) Future[None]
connect(host: str | None = None, port: int | None = None, multi: bool | None = None, default_state: bool = False, *, sync: Literal[True]) None
connect(host: str | None = None, port: int | None = None, multi: bool | None = None, default_state: bool = False, *, block: Literal[False], sync: Literal[True]) Future[None]

Connect to the ZMQ server.

Arguments * host : str | None = None: the host to connect to, or None to use the configured host * port : int | None = None: the port to connect to, or None to use the configured port * multi : bool | None = None: whether to use multi-client safe commands, or None to use the configured value * default_state : bool = False: if True, do not restore the state from the state file * block : bool = True: perform a blocking call

Result * None: when block = True * otherwise a future

property context: Context

The ZMQ context used by this client.

disconnect() None
disconnect(*, block: Literal[False]) Future[None]
disconnect(*, sync: Literal[True]) None
disconnect(*, block: Literal[False], sync: Literal[True]) Future[None]

Disconnect from the ZMQ server.

Arguments * block : bool = True: perform a blocking call

Result * None: when block = True * otherwise a future

echo(msg: str) str
echo(msg: str, *, block: Literal[False]) Future[str]
echo(msg: str, *, sync: Literal[True]) str
echo(msg: str, *, block: Literal[False], sync: Literal[True]) Future[str]

Echo a message via the ZMQ server.

Arguments * msg : str: the message to echo * block : bool = True: perform a blocking call

Result * str: the echoed message when block = True * otherwise a future with this str

Raises * NotSupported: when the echo command is not supported by the server

fast_poll_threshold_s = 0.9
find(name: str, all=False) Object | Set[Object] | None

Find object(s) by name.

This functions uses the previously retrieved list of objects.

find_time() Object | None
find_time(*, block: Literal[False]) asyncio.Future[Object | None]
find_time(*, sync: Literal[True]) Object | None
find_time(*, block: Literal[False], sync: Literal[True]) concurrent.futures.Future[Object | None]

Find the time object. It should start with /t, and have a unit between parentheses, like /t (s).

Arguments * block : bool = True: perform a blocking call

Result * Object | None: the time object when found, or None when not found, when block = True * otherwise a future with this Object | None

property host: str

Configured or currently connected host.

identification() str
identification(*, block: Literal[False]) Future[str]
identification(*, sync: Literal[True]) str
identification(*, block: Literal[False], sync: Literal[True]) Future[str]

Get the identification string.

Arguments * block : bool = True: perform a blocking call

Result * str: the identification string, which is empty when not supported, when block = True * otherwise a future with this str

is_connected() bool

Check if connected to the ZMQ server.

list() List[Object]
list(*, block: Literal[False]) asyncio.Future[List[Object]]
list(*, sync: Literal[True]) List[Object]
list(*, block: Literal[False], sync: Literal[True]) concurrent.futures.Future[List[Object]]

List the objects available.

Arguments * block : bool = True: perform a blocking call

Result * List[Object]: the list of objects when block = True * otherwise a future with this List[Object]

Raises * NotSupported: when the List command is not supported by the server * InvalidResponse: when the List command returned an invalid response

macro(name: str | None, *args, **kwargs) Macro

Create a macro object for the given macro name.

property multi: bool

Return whether the client uses a subset of the commands that are safe when multiple connections to the same ZMQ server are made.

obj(x: str) Object

Get an object by name.

property objects: List[Object]
other_streams()
periodic(interval_s: float, f: Callable, *args, name: str | None = None) Task
periodic(interval_s: float, f: Callable, *args, name: str | None = None, block: Literal[False]) Future[Task] | Task

Run a function periodically while the client is alive.

Arguments * interval_s : float: the interval in seconds between calls; if 0, run as often as possible * f : Callable: the function to call periodically; can be a coroutine or a normal function * name : str | None = None: the name of the periodic task * *args: arguments to pass to f * block : bool = True: perform a blocking call

Result * asyncio.Task: the created periodic task when block = True * otherwise a future with this asyncio.Task

property port: int

Configured or currently connected port.

read_mem(pointer: int, size: int) bytearray
read_mem(pointer: int, size: int, *, block: Literal[False]) Future[bytearray]
read_mem(pointer: int, size: int, *, sync: Literal[True]) bytearray
read_mem(pointer: int, size: int, *, block: Literal[False], sync: Literal[True]) Future[bytearray]

Read memory from the connected device.

Arguments * pointer : int: the memory address to read from * size : int: the number of bytes to read * block : bool = True: perform a blocking call

Result * bytearray: the read data when block = True * otherwise a future with this bytearray

Raises * NotSupported: when the ReadMem command is not supported by the server * OperationFailed: when the ReadMem command failed

release_alias(alias: str, permanentRef=None) None
release_alias(alias: str, permanentRef=None, *, block: Literal[False]) Future[None]
release_alias(alias: str, permanentRef=None, *, sync: Literal[True]) None
release_alias(alias: str, permanentRef=None, *, block: Literal[False], sync: Literal[True]) Future[None]

Release an alias.

Arguments * alias : str: the alias to release * permanentRef : Any = None: when releasing a permanent alias, a reference to the user of this alias.

Result * None: when block = True * otherwise a future

release_macro(m: str | bytes | Macro) None
release_macro(m: str | bytes | Macro, *, block: Literal[False]) Future[None]
release_macro(m: str | bytes | Macro, *, sync: Literal[True]) None
release_macro(m: str | bytes | Macro, *, block: Literal[False], sync: Literal[True]) Future[None]

Release a macro.

Arguments * m : str | bytes | Macro: the macro to release, or its name * block : bool = True: perform a blocking call

Result * None: when block = True * otherwise a future

req(msg: bytes) bytes
req(msg: str) str
req(msg: bytes, *, block: Literal[False]) Future[bytes]
req(msg: str, *, block: Literal[False]) Future[str]
req(msg: bytes, *, sync: Literal[True]) bytes
req(msg: str, *, sync: Literal[True]) str
req(msg: bytes, *, block: Literal[False], sync: Literal[True]) Future[bytes]
req(msg: str, *, block: Literal[False], sync: Literal[True]) Future[str]

Send a request to the ZMQ server and wait for a reply.

Arguments * msg : bytes | str: the request message to send * block : bool = True: perform a blocking call

Result * bytes: the reply message when the type of msg is bytes and block = True * str: the reply message when the type of msg is str and block = True * otherwise a future with this bytes | str

Raises * InvalidState: when not connected * Disconnected: when the connection was lost during the request * OperationFailed: when the request failed or interrupted

restore_state(state_name: str | None = None) None
restore_state(state_name: str | None = None, *, block: Literal[False]) Future[None]
restore_state(state_name: str | None = None, *, sync: Literal[True]) None
restore_state(state_name: str | None = None, *, block: Literal[False], sync: Literal[True]) Future[None]

Restore the state from a file.

Arguments * state_name : str | None: the name of the state file to restore from

Result * None: when block = True * otherwise a future

save_state(state_name: str | None = None) None
save_state(state_name: str | None = None, *, block: Literal[False]) Future[None]
save_state(state_name: str | None = None, *, sync: Literal[True]) None
save_state(state_name: str | None = None, *, block: Literal[False], sync: Literal[True]) Future[None]

Save the current state to a file.

Arguments * state_name : str | None: the name of the state file to save to

Result * None: when block = True * otherwise a future

slow_poll_threshold_s = 1.0
property socket: Socket | None

The ZMQ socket used by this client, or None if not connected.

state() dict

Get the current state of the client.

state_file(state_name: str | None = None) str | None

Get the state file name.

stream(s: str, raw: bool = False) Stream

Get a Stream object for the given stream name.

streams() List[str]
streams(*, block: Literal[False]) Future[List[str]]
streams(*, sync: Literal[True]) List[str]
streams(*, block: Literal[False], sync: Literal[True]) Future[List[str]]

Get the list of available streams.

Arguments * block : bool = True: perform a blocking call

Result * List[str]: the list of stream names when block = True * otherwise a future with this List[str]

sync() SyncZmqClient
time() Object | None
timestamp_to_time(t: float | None = None) float
trace_poll_interval_s = 0.5
trace_threshold_s = 0.1
version() str
version(*, block: Literal[False]) Future[str]
version(*, sync: Literal[True]) str
version(*, block: Literal[False], sync: Literal[True]) Future[str]

Get the version string.

Arguments * block : bool = True: perform a blocking call

Result * str: the version string, which is empty when not supported, when block = True * otherwise a future with this str

write_mem(pointer: int, data: bytearray) None
write_mem(pointer: int, data: bytearray, *, block: Literal[False]) Future[None]
write_mem(pointer: int, data: bytearray, *, sync: Literal[True]) None
write_mem(pointer: int, data: bytearray, *, block: Literal[False], sync: Literal[True]) Future[None]

Write memory to the connected device.

Arguments * pointer : int: the memory address to write to * data : bytearray: the data to write * block : bool = True: perform a blocking call

Result * None: when block = True * otherwise a future

Raises * NotSupported: when the WriteMem command is not supported by the server * OperationFailed: when the WriteMem command failed

Protocol layers

class libstored.protocol.ProtocolLayer(*args, **kwargs)

Base class for all protocol layers.

Layers can be stacked (aka wrapped); the top (inner) layer is the application layer, the bottom (outer) layer is the physical layer.

Encoding means sending data down the stack (towards the physical layer), decoding means receiving data up the stack (towards the application layer).

AsyncCallback

alias of Callable[[bytes | bytearray | memoryview | str], Coroutine[Any, Any, None]]

Callback

alias of Callable[[bytes | bytearray | memoryview | str], Any]

Packet: TypeAlias = bytes | bytearray | memoryview | str
activity() None

Mark that there was activity on this layer.

async async_except(e: BaseException) None
property async_except_hook: Callable[[BaseException], Coroutine[Any, Any, None]]
async close() None

Close the layer and release resources.

async decode(data: bytes | bytearray | memoryview | str) None

Decode data received from the lower layer.

async default_async_except_hook(e: BaseException) None
property down: ProtocolLayer | None
async encode(data: bytes | bytearray | memoryview | str) None

Encode data for transmission.

last_activity() float

Get the time of the last activity on this layer or any lower layer.

property mtu: int | None

Maximum Transmission Unit (MTU) for this layer. Return None when there is no limit.

name = 'layer'
async timeout() None

Trigger maintenance actions when a timeout occurs.

property up: ProtocolLayer | None
wrap(layer: ProtocolLayer) None

Wrap this layer around another layer.

class libstored.protocol.AsciiEscapeLayer(*args, **kwargs)

Layer that escapes non-printable ASCII characters.

class libstored.protocol.TerminalLayer(fdout: str | int | Callable[[bytes | bytearray | memoryview | str], Any] | None = 1, ignoreEscapesTillFirstEncode: bool = True, *args, **kwargs)

A ProtocolLayer that encodes debug messages in terminal escape codes. Non-debug messages are passed through unchanged.

class libstored.protocol.PubTerminalLayer(bind: str = '*:19027', *args, context: Context | None = None, **kwargs)

A TerminalLayer (term), that also forwards all non-debug data over a PUB socket.

class libstored.protocol.RepReqCheckLayer(timeout_s: float = 1, *args, **kwargs)

A ProtocolLayer that checks that requests and replies are matched. It triggers timeout() when a reply is not received in time.

class libstored.protocol.SegmentationLayer(mtu: str | int | None = None, *args, **kwargs)

A ProtocolLayer that segments and reassembles data according to a simple protocol with ‘E’ (end) and ‘C’ (continue) markers.

class libstored.protocol.DebugArqLayer(timeout_s: float = 1, *args, **kwargs)

A ProtocolLayer that implements a simple ARQ protocol for debugging.

class libstored.protocol.Crc8Layer(*args, **kwargs)

ProtocolLayer to add and check integrity using a CRC8.

class libstored.protocol.Crc16Layer(*args, **kwargs)

ProtocolLayer to add and check integrity using a CRC16.

class libstored.protocol.Crc32Layer(*args, **kwargs)

ProtocolLayer to add and check integrity using a CRC32.

class libstored.protocol.LoopbackLayer(*args, **kwargs)

A ProtocolLayer that loops back all data.

class libstored.protocol.RawLayer(*args, **kwargs)

A ProtocolLayer that just forwards raw data as bytes.

class libstored.protocol.MuxLayer(default: int | str = 0, repeat_interval: float = 1, *args, **kwargs)

A ProtocolLayer that multiplexes data to different upper layers based on an channel identifier.

class libstored.protocol.StdinLayer(*args, **kwargs)

A terminal layer that reads from stdin.

class libstored.protocol.StdioLayer(cmd, *args, **kwargs)

A protocol layer that runs a subprocess and connects to its stdin/stdout.

class libstored.protocol.PrintLayer(prefix: tuple | str | None = None, *args, **kwargs)

A protocol layer that prints all data sent and received.

class libstored.protocol.SerialLayer(*, drop_s: float | None = 1, **kwargs)
class libstored.protocol.FileLayer(file: str | tuple[str, str], *args, **kwargs)

A protocol layer that reads/writes a file for I/O.

class libstored.protocol.ZmqSocketClient(*args, server: str = 'localhost', port: int = default_port, type: int = zmq.DEALER, context: Context | None = None, **kwargs)
class libstored.protocol.ZmqSocketClient(connect: str, *args, context: Context | None = None, type: int = zmq.DEALER, **kwargs)

Generic ZMQ client socket layer.

This layer is expected to be at the bottom of the protocol stack. Received data is passed up the stack.

class libstored.protocol.ZmqSocketServer(*args, type: int = zmq.DEALER, listen: str = '*', port: int = default_port, context: Context | None = None, **kwargs)
class libstored.protocol.ZmqSocketServer(bind: str, *args, type: int = zmq.DEALER, context: Context | None = None, **kwargs)

Generic ZMQ server (listening) socket layer.

This layer is expected to be at the top of the protocol stack. Received data is passed down the stack.

class libstored.protocol.ZmqServer(*args, listen: str = '*', port: int = default_port, context: Context | None = None, **kwargs)
class libstored.protocol.ZmqServer(bind: str, *args, context: Context | None = None, **kwargs)

A ZMQ Server, for REQ/REP debug messages.

This can be used to create a bridge from an arbitrary interface to ZMQ, which in turn can be used to connect a libstored.asyncio.ZmqClient to.

Protocol stack

class libstored.protocol.ProtocolStack(layers: list[ProtocolLayer], *args, **kwargs)

Composition of a stack of layers.

The given stack assumes that the layers are already wrapped. At index 0, the application layer is expected, at index -1 the physical layer.

class Iterator(stack)
async close() None

Close the layer and release resources.

async decode(data: bytes | bytearray | memoryview | str) None

Decode data received from the lower layer.

async encode(data: bytes | bytearray | memoryview | str) None

Encode data for transmission.

last_activity() float

Get the time of the last activity on this layer or any lower layer.

property mtu: int | None

Maximum Transmission Unit (MTU) for this layer. Return None when there is no limit.

name = 'stack'
libstored.protocol.register_layer_type(layer_type: Type[ProtocolLayer]) None

Register a new protocol layer type. The layer type must be a subclass of ProtocolLayer with a unique name.

libstored.protocol.unregister_layer_type(layer_type: Type[ProtocolLayer]) None

Unregister a protocol layer type.

libstored.protocol.get_layer_type(name: str) Type[ProtocolLayer]

Get a protocol layer type by name.

libstored.protocol.get_layer_types() list[Type[ProtocolLayer]]

Get the list of registered protocol layer types.

libstored.protocol.build_stack(description: str) ProtocolLayer

Construct the protocol stack from a description.

The description is a comma-separated string with layer ids. If the layer has a parameter, it can be specified. The stack is constructed top-down in order of the specified layers.

Grammar: ( <name> ( = <value> ) ? ) (, <name> ( = <value> ) ? ) *

libstored.protocol.stack(layers: list[ProtocolLayer], /) ProtocolLayer
libstored.protocol.stack(layers: ProtocolLayer, /, *args) ProtocolLayer

Create a ProtocolStack from a list of layers.