Base classes
ZmqClient
- class libstored.ZmqClient(host: str = 'localhost', port: int = 19026, multi: bool = False, timeout: float | None = None, context: None | Context = None, t: str | None = None, use_state: str | None = None, *args, **kwargs)
Asynchronous ZMQ client.
This client can connect to both the libstored.zmq_server.ZmqServer and stored::DebugZmqLayer.
- acquire_macro() str | None
- acquire_macro(*, block: Literal[False]) Future[str | None]
- acquire_macro(*, sync: Literal[True]) str | None
- acquire_macro(*, block: Literal[False], sync: Literal[True]) Future[str | None]
Get a free macro name.
In case there is no available macro name, None is returned. This can still be used to create a Macro object, although it is not that efficient.
Arguments * block : bool = True: perform a blocking call
Result * str | None: the macro name when block = True * otherwise a future with this str | None
- alias(obj: str | Object, prefer: str | None = None, temporary: bool = True, permanentRef: Any = None) str | None
- alias(obj: str | Object, prefer: str | None = None, temporary: bool = True, permanentRef: Any = None, *, block: Literal[False]) Future[str | None]
- alias(obj: str | Object, prefer: str | None = None, temporary: bool = True, permanentRef: Any = None, *, sync: Literal[True]) str | None
- alias(obj: str | Object, prefer: str | None = None, temporary: bool = True, permanentRef: Any = None, *, block: Literal[False], sync: Literal[True]) Future[str | None]
Assign an alias to an object.
Arguments * obj : str | Object: the object to assign an alias to, or its name * prefer : str | None = None: the preferred alias, or None to get any available one * temporary : bool = True: if True, assign a temporary alias, otherwise a permanent one * permanentRef : Any = None: when assigning a permanent alias, a reference to the user of this alias.
Result * str | None: the assigned alias, or None when no alias could be assigned, when block = True * otherwise a future with this str | None
- capabilities() str
- capabilities(*, block: Literal[False]) Future[str]
- capabilities(*, sync: Literal[True]) str
- capabilities(*, block: Literal[False], sync: Literal[True]) Future[str]
Get the capabilities of the connected ZMQ server.
Arguments * block : bool = True: perform a blocking call
Result * str with the capabilities when block = True * otherwise a future
- close() None
- close(*, block: Literal[False]) Future[None]
- close(*, sync: Literal[True]) None
- close(*, block: Literal[False], sync: Literal[True]) Future[None]
Alias for disconnect().
- connect(host: str | None = None, port: int | None = None, multi: bool | None = None, default_state: bool = False) None
- connect(host: str | None = None, port: int | None = None, multi: bool | None = None, default_state: bool = False, *, block: Literal[False]) Future[None]
- connect(host: str | None = None, port: int | None = None, multi: bool | None = None, default_state: bool = False, *, sync: Literal[True]) None
- connect(host: str | None = None, port: int | None = None, multi: bool | None = None, default_state: bool = False, *, block: Literal[False], sync: Literal[True]) Future[None]
Connect to the ZMQ server.
Arguments * host : str | None = None: the host to connect to, or None to use the configured host * port : int | None = None: the port to connect to, or None to use the configured port * multi : bool | None = None: whether to use multi-client safe commands, or None to use the configured value * default_state : bool = False: if True, do not restore the state from the state file * block : bool = True: perform a blocking call
Result * None: when block = True * otherwise a future
- property context: Context
The ZMQ context used by this client.
- disconnect() None
- disconnect(*, block: Literal[False]) Future[None]
- disconnect(*, sync: Literal[True]) None
- disconnect(*, block: Literal[False], sync: Literal[True]) Future[None]
Disconnect from the ZMQ server.
Arguments * block : bool = True: perform a blocking call
Result * None: when block = True * otherwise a future
- echo(msg: str) str
- echo(msg: str, *, block: Literal[False]) Future[str]
- echo(msg: str, *, sync: Literal[True]) str
- echo(msg: str, *, block: Literal[False], sync: Literal[True]) Future[str]
Echo a message via the ZMQ server.
Arguments * msg : str: the message to echo * block : bool = True: perform a blocking call
Result * str: the echoed message when block = True * otherwise a future with this str
Raises * NotSupported: when the echo command is not supported by the server
- fast_poll_threshold_s = 0.9
- find(name: str, all=False) Object | Set[Object] | None
Find object(s) by name.
This functions uses the previously retrieved list of objects.
- find_time() Object | None
- find_time(*, block: Literal[False]) asyncio.Future[Object | None]
- find_time(*, sync: Literal[True]) Object | None
- find_time(*, block: Literal[False], sync: Literal[True]) concurrent.futures.Future[Object | None]
Find the time object. It should start with /t, and have a unit between parentheses, like /t (s).
Arguments * block : bool = True: perform a blocking call
Result * Object | None: the time object when found, or None when not found, when block = True * otherwise a future with this Object | None
- property host: str
Configured or currently connected host.
- identification() str
- identification(*, block: Literal[False]) Future[str]
- identification(*, sync: Literal[True]) str
- identification(*, block: Literal[False], sync: Literal[True]) Future[str]
Get the identification string.
Arguments * block : bool = True: perform a blocking call
Result * str: the identification string, which is empty when not supported, when block = True * otherwise a future with this str
- is_connected() bool
Check if connected to the ZMQ server.
- list() List[Object]
- list(*, block: Literal[False]) asyncio.Future[List[Object]]
- list(*, sync: Literal[True]) List[Object]
- list(*, block: Literal[False], sync: Literal[True]) concurrent.futures.Future[List[Object]]
List the objects available.
Arguments * block : bool = True: perform a blocking call
Result * List[Object]: the list of objects when block = True * otherwise a future with this List[Object]
Raises * NotSupported: when the List command is not supported by the server * InvalidResponse: when the List command returned an invalid response
- macro(name: str | None, *args, **kwargs) Macro
Create a macro object for the given macro name.
- property multi: bool
Return whether the client uses a subset of the commands that are safe when multiple connections to the same ZMQ server are made.
- obj(x: str) Object
Get an object by name.
- property objects: List[Object]
- other_streams()
- periodic(interval_s: float, f: Callable, *args, name: str | None = None) Task
- periodic(interval_s: float, f: Callable, *args, name: str | None = None, block: Literal[False]) Future[Task] | Task
Run a function periodically while the client is alive.
Arguments * interval_s : float: the interval in seconds between calls; if 0, run as often as possible * f : Callable: the function to call periodically; can be a coroutine or a normal function * name : str | None = None: the name of the periodic task * *args: arguments to pass to f * block : bool = True: perform a blocking call
Result * asyncio.Task: the created periodic task when block = True * otherwise a future with this asyncio.Task
- property port: int
Configured or currently connected port.
- read_mem(pointer: int, size: int) bytearray
- read_mem(pointer: int, size: int, *, block: Literal[False]) Future[bytearray]
- read_mem(pointer: int, size: int, *, sync: Literal[True]) bytearray
- read_mem(pointer: int, size: int, *, block: Literal[False], sync: Literal[True]) Future[bytearray]
Read memory from the connected device.
Arguments * pointer : int: the memory address to read from * size : int: the number of bytes to read * block : bool = True: perform a blocking call
Result * bytearray: the read data when block = True * otherwise a future with this bytearray
Raises * NotSupported: when the ReadMem command is not supported by the server * OperationFailed: when the ReadMem command failed
- release_alias(alias: str, permanentRef=None) None
- release_alias(alias: str, permanentRef=None, *, block: Literal[False]) Future[None]
- release_alias(alias: str, permanentRef=None, *, sync: Literal[True]) None
- release_alias(alias: str, permanentRef=None, *, block: Literal[False], sync: Literal[True]) Future[None]
Release an alias.
Arguments * alias : str: the alias to release * permanentRef : Any = None: when releasing a permanent alias, a reference to the user of this alias.
Result * None: when block = True * otherwise a future
- release_macro(m: str | bytes | Macro) None
- release_macro(m: str | bytes | Macro, *, block: Literal[False]) Future[None]
- release_macro(m: str | bytes | Macro, *, sync: Literal[True]) None
- release_macro(m: str | bytes | Macro, *, block: Literal[False], sync: Literal[True]) Future[None]
Release a macro.
Arguments * m : str | bytes | Macro: the macro to release, or its name * block : bool = True: perform a blocking call
Result * None: when block = True * otherwise a future
- req(msg: bytes) bytes
- req(msg: str) str
- req(msg: bytes, *, block: Literal[False]) Future[bytes]
- req(msg: str, *, block: Literal[False]) Future[str]
- req(msg: bytes, *, sync: Literal[True]) bytes
- req(msg: str, *, sync: Literal[True]) str
- req(msg: bytes, *, block: Literal[False], sync: Literal[True]) Future[bytes]
- req(msg: str, *, block: Literal[False], sync: Literal[True]) Future[str]
Send a request to the ZMQ server and wait for a reply.
Arguments * msg : bytes | str: the request message to send * block : bool = True: perform a blocking call
Result * bytes: the reply message when the type of msg is bytes and block = True * str: the reply message when the type of msg is str and block = True * otherwise a future with this bytes | str
Raises * InvalidState: when not connected * Disconnected: when the connection was lost during the request * OperationFailed: when the request failed or interrupted
- restore_state(state_name: str | None = None) None
- restore_state(state_name: str | None = None, *, block: Literal[False]) Future[None]
- restore_state(state_name: str | None = None, *, sync: Literal[True]) None
- restore_state(state_name: str | None = None, *, block: Literal[False], sync: Literal[True]) Future[None]
Restore the state from a file.
Arguments * state_name : str | None: the name of the state file to restore from
Result * None: when block = True * otherwise a future
- save_state(state_name: str | None = None) None
- save_state(state_name: str | None = None, *, block: Literal[False]) Future[None]
- save_state(state_name: str | None = None, *, sync: Literal[True]) None
- save_state(state_name: str | None = None, *, block: Literal[False], sync: Literal[True]) Future[None]
Save the current state to a file.
Arguments * state_name : str | None: the name of the state file to save to
Result * None: when block = True * otherwise a future
- slow_poll_threshold_s = 1.0
- property socket: Socket | None
The ZMQ socket used by this client, or None if not connected.
- state() dict
Get the current state of the client.
- state_file(state_name: str | None = None) str | None
Get the state file name.
- stream(s: str, raw: bool = False) Stream
Get a Stream object for the given stream name.
- streams() List[str]
- streams(*, block: Literal[False]) Future[List[str]]
- streams(*, sync: Literal[True]) List[str]
- streams(*, block: Literal[False], sync: Literal[True]) Future[List[str]]
Get the list of available streams.
Arguments * block : bool = True: perform a blocking call
Result * List[str]: the list of stream names when block = True * otherwise a future with this List[str]
- sync() SyncZmqClient
- time() Object | None
- timestamp_to_time(t: float | None = None) float
- trace_poll_interval_s = 0.5
- trace_threshold_s = 0.1
- version() str
- version(*, block: Literal[False]) Future[str]
- version(*, sync: Literal[True]) str
- version(*, block: Literal[False], sync: Literal[True]) Future[str]
Get the version string.
Arguments * block : bool = True: perform a blocking call
Result * str: the version string, which is empty when not supported, when block = True * otherwise a future with this str
- write_mem(pointer: int, data: bytearray) None
- write_mem(pointer: int, data: bytearray, *, block: Literal[False]) Future[None]
- write_mem(pointer: int, data: bytearray, *, sync: Literal[True]) None
- write_mem(pointer: int, data: bytearray, *, block: Literal[False], sync: Literal[True]) Future[None]
Write memory to the connected device.
Arguments * pointer : int: the memory address to write to * data : bytearray: the data to write * block : bool = True: perform a blocking call
Result * None: when block = True * otherwise a future
Raises * NotSupported: when the WriteMem command is not supported by the server * OperationFailed: when the WriteMem command failed
Protocol layers
- class libstored.protocol.ProtocolLayer(*args, **kwargs)
Base class for all protocol layers.
Layers can be stacked (aka wrapped); the top (inner) layer is the application layer, the bottom (outer) layer is the physical layer.
Encoding means sending data down the stack (towards the physical layer), decoding means receiving data up the stack (towards the application layer).
- AsyncCallback
alias of
Callable[[bytes|bytearray|memoryview|str],Coroutine[Any,Any,None]]
- Callback
alias of
Callable[[bytes|bytearray|memoryview|str],Any]
- Packet: TypeAlias = bytes | bytearray | memoryview | str
- activity() None
Mark that there was activity on this layer.
- async async_except(e: BaseException) None
- property async_except_hook: Callable[[BaseException], Coroutine[Any, Any, None]]
- async close() None
Close the layer and release resources.
- async decode(data: bytes | bytearray | memoryview | str) None
Decode data received from the lower layer.
- async default_async_except_hook(e: BaseException) None
- property down: ProtocolLayer | None
- async encode(data: bytes | bytearray | memoryview | str) None
Encode data for transmission.
- last_activity() float
Get the time of the last activity on this layer or any lower layer.
- property mtu: int | None
Maximum Transmission Unit (MTU) for this layer. Return None when there is no limit.
- name = 'layer'
- async timeout() None
Trigger maintenance actions when a timeout occurs.
- property up: ProtocolLayer | None
- wrap(layer: ProtocolLayer) None
Wrap this layer around another layer.
- class libstored.protocol.AsciiEscapeLayer(*args, **kwargs)
Layer that escapes non-printable ASCII characters.
- class libstored.protocol.TerminalLayer(fdout: str | int | Callable[[bytes | bytearray | memoryview | str], Any] | None = 1, ignoreEscapesTillFirstEncode: bool = True, *args, **kwargs)
A ProtocolLayer that encodes debug messages in terminal escape codes. Non-debug messages are passed through unchanged.
- class libstored.protocol.PubTerminalLayer(bind: str = '*:19027', *args, context: Context | None = None, **kwargs)
A TerminalLayer (term), that also forwards all non-debug data over a PUB socket.
- class libstored.protocol.RepReqCheckLayer(timeout_s: float = 1, *args, **kwargs)
A ProtocolLayer that checks that requests and replies are matched. It triggers timeout() when a reply is not received in time.
- class libstored.protocol.SegmentationLayer(mtu: str | int | None = None, *args, **kwargs)
A ProtocolLayer that segments and reassembles data according to a simple protocol with ‘E’ (end) and ‘C’ (continue) markers.
- class libstored.protocol.DebugArqLayer(timeout_s: float = 1, *args, **kwargs)
A ProtocolLayer that implements a simple ARQ protocol for debugging.
- class libstored.protocol.Crc8Layer(*args, **kwargs)
ProtocolLayer to add and check integrity using a CRC8.
- class libstored.protocol.Crc16Layer(*args, **kwargs)
ProtocolLayer to add and check integrity using a CRC16.
- class libstored.protocol.Crc32Layer(*args, **kwargs)
ProtocolLayer to add and check integrity using a CRC32.
- class libstored.protocol.LoopbackLayer(*args, **kwargs)
A ProtocolLayer that loops back all data.
- class libstored.protocol.RawLayer(*args, **kwargs)
A ProtocolLayer that just forwards raw data as bytes.
- class libstored.protocol.MuxLayer(default: int | str = 0, repeat_interval: float = 1, *args, **kwargs)
A ProtocolLayer that multiplexes data to different upper layers based on an channel identifier.
- class libstored.protocol.StdinLayer(*args, **kwargs)
A terminal layer that reads from stdin.
- class libstored.protocol.StdioLayer(cmd, *args, **kwargs)
A protocol layer that runs a subprocess and connects to its stdin/stdout.
- class libstored.protocol.PrintLayer(prefix: tuple | str | None = None, *args, **kwargs)
A protocol layer that prints all data sent and received.
- class libstored.protocol.SerialLayer(*, drop_s: float | None = 1, **kwargs)
- class libstored.protocol.FileLayer(file: str | tuple[str, str], *args, **kwargs)
A protocol layer that reads/writes a file for I/O.
- class libstored.protocol.ZmqSocketClient(*args, server: str = 'localhost', port: int = default_port, type: int = zmq.DEALER, context: Context | None = None, **kwargs)
- class libstored.protocol.ZmqSocketClient(connect: str, *args, context: Context | None = None, type: int = zmq.DEALER, **kwargs)
Generic ZMQ client socket layer.
This layer is expected to be at the bottom of the protocol stack. Received data is passed up the stack.
- class libstored.protocol.ZmqSocketServer(*args, type: int = zmq.DEALER, listen: str = '*', port: int = default_port, context: Context | None = None, **kwargs)
- class libstored.protocol.ZmqSocketServer(bind: str, *args, type: int = zmq.DEALER, context: Context | None = None, **kwargs)
Generic ZMQ server (listening) socket layer.
This layer is expected to be at the top of the protocol stack. Received data is passed down the stack.
- class libstored.protocol.ZmqServer(*args, listen: str = '*', port: int = default_port, context: Context | None = None, **kwargs)
- class libstored.protocol.ZmqServer(bind: str, *args, context: Context | None = None, **kwargs)
A ZMQ Server, for REQ/REP debug messages.
This can be used to create a bridge from an arbitrary interface to ZMQ, which in turn can be used to connect a libstored.asyncio.ZmqClient to.
Protocol stack
- class libstored.protocol.ProtocolStack(layers: list[ProtocolLayer], *args, **kwargs)
Composition of a stack of layers.
The given stack assumes that the layers are already wrapped. At index 0, the application layer is expected, at index -1 the physical layer.
- class Iterator(stack)
- async close() None
Close the layer and release resources.
- async decode(data: bytes | bytearray | memoryview | str) None
Decode data received from the lower layer.
- async encode(data: bytes | bytearray | memoryview | str) None
Encode data for transmission.
- last_activity() float
Get the time of the last activity on this layer or any lower layer.
- property mtu: int | None
Maximum Transmission Unit (MTU) for this layer. Return None when there is no limit.
- name = 'stack'
- libstored.protocol.register_layer_type(layer_type: Type[ProtocolLayer]) None
Register a new protocol layer type. The layer type must be a subclass of ProtocolLayer with a unique name.
- libstored.protocol.unregister_layer_type(layer_type: Type[ProtocolLayer]) None
Unregister a protocol layer type.
- libstored.protocol.get_layer_type(name: str) Type[ProtocolLayer]
Get a protocol layer type by name.
- libstored.protocol.get_layer_types() list[Type[ProtocolLayer]]
Get the list of registered protocol layer types.
- libstored.protocol.build_stack(description: str) ProtocolLayer
Construct the protocol stack from a description.
The description is a comma-separated string with layer ids. If the layer has a parameter, it can be specified. The stack is constructed top-down in order of the specified layers.
Grammar: ( <name> (
=<value> ) ? ) (,<name> (=<value> ) ? ) *
- libstored.protocol.stack(layers: list[ProtocolLayer], /) ProtocolLayer
- libstored.protocol.stack(layers: ProtocolLayer, /, *args) ProtocolLayer
Create a ProtocolStack from a list of layers.