concurrency

Store definition

ExampleConcurrencyMain

// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers
//
// SPDX-License-Identifier: CC0-1.0

uint32 not synchronized to other thread

ExampleConcurrencyControl

// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers
//
// SPDX-License-Identifier: CC0-1.0

bool=true run
uint32 setpoint
int32=-1 override
uint32 actual

Application

// SPDX-FileCopyrightText: 2020-2023 Jochem Rutgers
//
// SPDX-License-Identifier: CC0-1.0

/*!
 * \file
 * \brief Example with concurrency and message passing for synchronization.
 *
 * Although threads are undesirable, as it makes your application to a large
 * extend unpredictable, concurrency may not be avoidable some times. This
 * example shows how you can synchronize stores between threads, such that
 * threads do not share memory (except for the communication channels, but that
 * is handled by libstored).
 *
 * This pattern discussed in this example resembles a microcontroller, which
 * runs the main application, while this application is interrupted by a timer,
 * such that a real-time control loop can be executed. So, the main application
 * is not real-time and may consume all CPU cycles left over from the control
 * loop. In this case, the control loop runs concurrently to the application,
 * with all threading issues you can imagine. libstored gives you
 * message-passing channels, such that the control loop (interrupt handler) and
 * the main application has their own instance of the same store, which is
 * synchronized.
 *
 * Visualized, the setup of the application is as follows:
 *
 *
 * main()                                          interrupt handler
 * - background tasks                              - control loop
 *    |            |                                     |
 *    |            |                                     |
 * Main store     Control store                     Control store
 *    |              |   |                               |
 *    +--------------+   |                               |
 *    |                  |                               |
 * Debugger         Synchronizer                    Synchronizer
 *                       |                               |
 *                       |                               |
 *                       +--------- FifoLoopback --------+
 *
 *
 * So, the main application exposes its instances of the stores to the
 * Debugger.  The main()'s Control store is synchronized with the interrupt
 * handler's instance. The FifoLoopback is a thread-safe bidirectional
 * ProtocolLayer, with bounded FIFO memory. No dynamic allocation is done after
 * initialization. The FIFO is lock-free. However, you have to specify what
 * happens when it gets full (drop data, suspend for a while, etc.)
 *
 * For demo purposes, the interrupt handler is in this example implemented as
 * an std::thread.
 */

#include "ExampleConcurrencyControl.h"
#include "ExampleConcurrencyMain.h"

#include <cerrno>
#include <chrono>
#include <cstdio>
#include <stored>
#include <string>
#include <thread>

// Make the Control store synchronizable.
class ControlStore
	: public STORE_T(
		  ControlStore, stored::Synchronizable, stored::ExampleConcurrencyControlBase) {
	STORE_CLASS(ControlStore, stored::Synchronizable, stored::ExampleConcurrencyControlBase)
public:
	ControlStore() = default;
};

// Use a bounded memory for the FifoLoopback channels. This is set at four
// times the maximum message a Synchronizer may send (which is usually only
// during initial setup when the full buffer is transmitted). You have to think
// about what is appropriate for your application.
using FifoLoopback = stored::FifoLoopback<ControlStore::MaxMessageSize * 4>;

// This is the 'interrupt handler'. In this example an std::thread.
static void
control(ControlStore& controlStore, stored::Synchronizer& synchronizer, FifoLoopback& loopback)
{
	// Specify the handler to be called when a Synchronizer message that is
	// pushed into the FifoLoopback does not fit anymore. In this case, we just
	// yield and wait a while.  You may also decide to abort the application,
	// if you determined that it should not happen in your case.
	loopback.b2a().setOverflowHandler([&]() {
		while(loopback.b2a().full())
			std::this_thread::yield();

		return true;
	});

	while(controlStore.run) {
		// Sleep for a while (or wait for an 'interrupt').
		std::this_thread::sleep_for(std::chrono::seconds(1));

		// The Synchronizer may push at most one message back into the
		// FifoLoopback channel when receiving one. Therefore, only try to
		// decode a message when we know that it will not block.
		while(loopback.b2a().space() >= ControlStore::MaxMessageSize)
			if(loopback.a2b().recv())
				break;

		// This 'control loop' allows you to override the actual value.
		// Otherwise, it steps towards the setpoint.
		if(controlStore.override_obj >= 0)
			controlStore.actual = (uint32_t)controlStore.override_obj;
		else if(controlStore.actual < controlStore.setpoint)
			controlStore.actual = controlStore.actual + 1;
		else if(controlStore.actual > controlStore.setpoint)
			controlStore.actual = controlStore.actual - 1;

		// Only send updates when we know it will fit in the FifoLoopback.
		if(loopback.b2a().space() >= ControlStore::MaxMessageSize)
			synchronizer.process();
	}

	// Send a bye message and terminate the connection.
	synchronizer.disconnect(loopback.b());
}

int main(int argc, char** argv)
{
	// Before starting the control thread, initialize all components.  This
	// will use the heap, but that is OK, as we are not in the 'interrupt
	// handler'.  After initialization, it is safe to use the store and
	// Synchronizer instances.
	stored::ExampleConcurrencyMain mainStore;
	ControlStore controlStore;
	ControlStore controlStoreOther;

	// Create the debugger for both stores.
	stored::Debugger debug("concurrency");
	debug.map(mainStore);
	debug.map(controlStore);

#ifdef STORED_HAVE_ZMQ
	// Create a ZeroMQ connection for the debugger.
	stored::DebugZmqLayer zmq;
	zmq.wrap(debug);
#endif

	// This is the Synchronizer for this thread.
	stored::Synchronizer synchronizer;
	synchronizer.map(controlStore);

	// This is the Synchronizer for the other thread.
	stored::Synchronizer synchronizerOther;
	synchronizerOther.map(controlStoreOther);

	// The thread-safe message-passing channel between both Synchronizers.
	FifoLoopback loopback;

	// In case the FIFO gets full, this thread just stalls...
	loopback.a2b().setOverflowHandler([&]() {
		// ...but there should not be a deadlock.
		assert(!loopback.a2b().full() || !loopback.b2a().full());

		while(loopback.a2b().full())
			std::this_thread::yield();

		return true;
	});

	// Connect the loopback channel.
	synchronizer.connect(loopback.a());
	synchronizerOther.connect(loopback.b());
	// Specify that the other thread will use the channel as the source of its
	// store instance.
	synchronizerOther.syncFrom(controlStoreOther, loopback.b());

	// We need a poller to check for ZeroMQ (debugger) messages.
	stored::Poller poller;
#ifdef STORED_HAVE_ZMQ
	stored::PollableZmqLayer pollableZmq(zmq, stored::Pollable::PollIn);
	if((errno = poller.add(pollableZmq))) {
		perror("Cannot register zmq to poller");
		return 1;
	}
#endif

	// When actual value changes, it is printed to the console.
	auto prevActual = controlStore.actual.get();
	prevActual++; // Force to be different.

	// For demo purposes, you can specify the setpoint as a command line
	// argument.  If set, the application quits when the actual reaches the
	// setpoint.
	bool demo = false;
	if(argc >= 2) {
		try {
			controlStore.setpoint = (uint32_t)std::stoul(argv[1], nullptr, 0);
			demo = true;
			printf("Enabled demo mode with setpoint = %u\n",
			       (unsigned)controlStore.setpoint);
		} catch(...) {
		}
	}

	// Ready to start the control thread.
	std::thread controller(
		control, std::ref(controlStoreOther), std::ref(synchronizerOther),
		std::ref(loopback));

	// Main loop.
	while(!demo || controlStore.run) {
		// Check for ZeroMQ input.
		auto const& res = poller.poll(100);
		if(res.empty()) {
			if(errno != EAGAIN)
				perror("Cannot poll");
		} else {
#ifdef STORED_HAVE_ZMQ
			if(zmq.recv())
				perror("Cannot recv");
#endif
		}

		// Check for Synchronizer messages from the other thread.
		loopback.b2a().recvAll();

		if(prevActual != controlStore.actual)
			printf("actual = %u\n", (unsigned)(prevActual = controlStore.actual));

		if(demo && prevActual == controlStore.setpoint)
			// Done, terminate.
			controlStore.run = false;

		// Push updates in our Control store to the other thread.
		synchronizer.process();
	}

	controller.join();
	return 0;
}

Output

When started like: concurrency 3

Enabled demo mode with setpoint = 3
actual = 0
actual = 1
actual = 2
actual = 3

Store reference

template<typename Base_, typename Implementation_>
class ExampleConcurrencyMainObjects

All ExampleConcurrencyMainBase’s objects.

Public Types

typedef Base_ Base
typedef Implementation_ Implementation

Public Members

impl::StoreVariable<Base, Implementation, uint32_t, 0u, 4> not_synchronized_to_other_thread

not synchronized to other thread

template<typename Implementation_>
class ExampleConcurrencyMainBase : public stored::ExampleConcurrencyMainObjects<ExampleConcurrencyMainBase<Implementation_>, Implementation_>

Base class with default interface of all ExampleConcurrencyMain implementations.

Although there are no virtual functions in the base class, subclasses can override them. The (lowest) subclass must pass the Implementation_ template paramater to its base, such that all calls from the base class can be directed to the proper overridden implementation.

The base class cannot be instantiated. If a default implementation is required, which does not have side effects to functions, instantiate stored::ExampleConcurrencyMain. This class contains all data of all variables, so it can be large. So, be aware when instantiating it on the stack. Heap is fine. Static allocations is fine too, as the constructor and destructor are trivial.

To inherit the base class, you can use the following template:

class ExampleConcurrencyMain : public stored::store<ExampleConcurrencyMain, ExampleConcurrencyMainBase>::type {
    STORE_CLASS(ExampleConcurrencyMain, ExampleConcurrencyMainBase)
public:
    // Your class implementation, such as:
    ExampleConcurrencyMain() is_default
    // ...
};

Some compilers or tools may get confused by the inheritance using stored::store or stored::store_t. Alternatively, use STORE_T(...) instead, providing the template parameters of stored::store as macro arguments.

See also

stored::ExampleConcurrencyMainData

Subclassed by stored::ExampleConcurrencyMainDefaultFunctions< ExampleConcurrencyMainBase< ExampleConcurrencyMain > >

Public Types

enum [anonymous]

Values:

enumerator ObjectCount

Number of objects in the store.

enumerator VariableCount

Number of variables in the store.

enumerator FunctionCount

Number of functions in the store.

enumerator BufferSize

Buffer size.

typedef Implementation_ Implementation

Type of the actual implementation, which is the (lowest) subclass.

typedef uintptr_t Key

Type of a key.

See also

bufferToKey()

typedef Map<String::type, Variant<Implementation>>::type ObjectMap

Map as generated by map().

typedef ExampleConcurrencyMainObjects<ExampleConcurrencyMainBase, Implementation_> Objects
typedef ExampleConcurrencyMainBase root

We are the root, as used by STORE_CLASS.

typedef ExampleConcurrencyMainBase self

Define self for stored::store.

Public Functions

inline ~ExampleConcurrencyMainBase()
inline Key bufferToKey(void const *buffer) const noexcept

Converts a variable’s buffer to a key.

A key is unique for all variables of the same store, but identical for the same variables across different instances of the same store class. Therefore, the key can be used to synchronize between instances of the same store. A key does not contain meta data, such as type or length. It is up to the synchronization library to make sure that these properties are handled well.

For synchronization, when hookEntryX() or hookEntryRO() is invoked, one can compute the key of the object that is accessed. The key can be used, for example, in a key-to-Variant map. When data arrives from another party, the key can be used to find the proper Variant in the map.

This way, data exchange is type-safe, as the Variant can check if the data format matches the expected type. However, one cannot process data if the key is not set yet in the map.

inline Type::type bufferToType(void const *buffer) noexcept

Return the type of the variable, given its buffer.

inline Variant<Implementation> find(char const *name, size_t len = std::numeric_limits<size_t>::max()) noexcept

Finds an object with the given name.

Returns:

the object, or an invalid stored::Variant if not found.

template<typename T>
inline Function<T, Implementation> function(char const *name, size_t len = std::numeric_limits<size_t>::max()) noexcept

Finds a function with the given name.

The function, when it exists, must have the given (fixed) type.

inline Implementation const &implementation() const noexcept

Returns the reference to the implementation.

inline Implementation &implementation() noexcept

Returns the reference to the implementation.

template<typename F>
inline void list(F &&f) noexcept

Calls a callback for every object in the longDirectory().

See also

stored::list()

template<typename F>
inline void list(F f, void *arg, char const *prefix, String::type *nameBuffer) noexcept

Calls a callback for every object in the longDirectory().

See also

stored::list()

template<typename F>
inline void list(F f, void *arg, char const *prefix = nullptr) noexcept

Calls a callback for every object in the longDirectory().

See also

stored::list()

inline uint8_t const *longDirectory() const noexcept

Retuns the long directory.

When not available, the short directory is returned.

inline ObjectMap map(char const *prefix = nullptr)

Create a name to Variant map for the store.

Generating the map may be expensive and the result is not cached.

inline char const *name() const noexcept

Returns the name of store, which can be used as prefix for stored::Debugger.

inline uint8_t const *shortDirectory() const noexcept

Returns the short directory.

template<typename T>
inline Variable<T, Implementation> variable(char const *name, size_t len = std::numeric_limits<size_t>::max()) noexcept

Finds a variable with the given name.

The variable, when it exists, must have the given (fixed) type.

Public Members

impl::StoreVariable<Base, Implementation, uint32_t, 0u, 4> not_synchronized_to_other_thread

not synchronized to other thread

Public Static Functions

template<typename T>
static inline constexpr FreeFunction<T, Implementation> freeFunction(char const *name, size_t len = std::numeric_limits<size_t>::max()) noexcept

Finds a function with the given name.

The function, when it exists, must have the given (fixed) type. It is returned as a free function; it is not bound yet to a specific store instance. This function is constexpr for C++14.

template<typename T>
static inline constexpr FreeVariable<T, Implementation> freeVariable(char const *name, size_t len = std::numeric_limits<size_t>::max()) noexcept

Finds a variable with the given name.

The variable, when it exists, must have the given (fixed) type. It is returned as a free variable; it is not bound yet to a specific store instance. This function is constexpr for C++14.

static inline constexpr char const *hash() noexcept

Returns a unique hash of the store.

Friends

friend class impl::StoreFunction
friend class impl::StoreVariable
friend class impl::StoreVariantF
friend class impl::StoreVariantV
friend class stored::FreeVariable
friend class stored::Variant< void >
class ExampleConcurrencyMain : public stored::ExampleConcurrencyMainDefaultFunctions<ExampleConcurrencyMainBase<ExampleConcurrencyMain>>

Default ExampleConcurrencyMainBase implementation.

Public Functions

ExampleConcurrencyMain() = default

Default constructor.

template<typename Base_, typename Implementation_>
class ExampleConcurrencyControlObjects

All ExampleConcurrencyControlBase’s objects.

Subclassed by stored::ExampleConcurrencyControlBase< ControlStore >

Public Types

typedef Base_ Base
typedef Implementation_ Implementation

Public Members

impl::StoreVariable<Base, Implementation, uint32_t, 12u, 4> actual

actual

impl::StoreVariable<Base, Implementation, int32_t, 0u, 4> override_obj

override

impl::StoreVariable<Base, Implementation, bool, 4u, 1> run

run

impl::StoreVariable<Base, Implementation, uint32_t, 8u, 4> setpoint

setpoint

template<typename Implementation_>
class ExampleConcurrencyControlBase : public stored::ExampleConcurrencyControlObjects<ExampleConcurrencyControlBase<Implementation_>, Implementation_>

Base class with default interface of all ExampleConcurrencyControl implementations.

Although there are no virtual functions in the base class, subclasses can override them. The (lowest) subclass must pass the Implementation_ template paramater to its base, such that all calls from the base class can be directed to the proper overridden implementation.

The base class cannot be instantiated. If a default implementation is required, which does not have side effects to functions, instantiate stored::ExampleConcurrencyControl. This class contains all data of all variables, so it can be large. So, be aware when instantiating it on the stack. Heap is fine. Static allocations is fine too, as the constructor and destructor are trivial.

To inherit the base class, you can use the following template:

class ExampleConcurrencyControl : public stored::store<ExampleConcurrencyControl, ExampleConcurrencyControlBase>::type {
    STORE_CLASS(ExampleConcurrencyControl, ExampleConcurrencyControlBase)
public:
    // Your class implementation, such as:
    ExampleConcurrencyControl() is_default
    // ...
};

Some compilers or tools may get confused by the inheritance using stored::store or stored::store_t. Alternatively, use STORE_T(...) instead, providing the template parameters of stored::store as macro arguments.

See also

stored::ExampleConcurrencyControl

See also

stored::ExampleConcurrencyControlData

Subclassed by stored::ExampleConcurrencyControlDefaultFunctions< ExampleConcurrencyControlBase< ExampleConcurrencyControl > >

Public Types

enum [anonymous]

Values:

enumerator ObjectCount

Number of objects in the store.

enumerator VariableCount

Number of variables in the store.

enumerator FunctionCount

Number of functions in the store.

enumerator BufferSize

Buffer size.

typedef Implementation_ Implementation

Type of the actual implementation, which is the (lowest) subclass.

typedef uintptr_t Key

Type of a key.

See also

bufferToKey()

typedef Map<String::type, Variant<Implementation>>::type ObjectMap

Map as generated by map().

typedef ExampleConcurrencyControlObjects<ExampleConcurrencyControlBase, Implementation_> Objects
typedef ExampleConcurrencyControlBase root

We are the root, as used by STORE_CLASS.

typedef ExampleConcurrencyControlBase self

Define self for stored::store.

Public Functions

inline ~ExampleConcurrencyControlBase()
inline Key bufferToKey(void const *buffer) const noexcept

Converts a variable’s buffer to a key.

A key is unique for all variables of the same store, but identical for the same variables across different instances of the same store class. Therefore, the key can be used to synchronize between instances of the same store. A key does not contain meta data, such as type or length. It is up to the synchronization library to make sure that these properties are handled well.

For synchronization, when hookEntryX() or hookEntryRO() is invoked, one can compute the key of the object that is accessed. The key can be used, for example, in a key-to-Variant map. When data arrives from another party, the key can be used to find the proper Variant in the map.

This way, data exchange is type-safe, as the Variant can check if the data format matches the expected type. However, one cannot process data if the key is not set yet in the map.

inline Type::type bufferToType(void const *buffer) noexcept

Return the type of the variable, given its buffer.

inline Variant<Implementation> find(char const *name, size_t len = std::numeric_limits<size_t>::max()) noexcept

Finds an object with the given name.

Returns:

the object, or an invalid stored::Variant if not found.

template<typename T>
inline Function<T, Implementation> function(char const *name, size_t len = std::numeric_limits<size_t>::max()) noexcept

Finds a function with the given name.

The function, when it exists, must have the given (fixed) type.

inline Implementation const &implementation() const noexcept

Returns the reference to the implementation.

inline Implementation &implementation() noexcept

Returns the reference to the implementation.

template<typename F>
inline void list(F &&f) noexcept

Calls a callback for every object in the longDirectory().

See also

stored::list()

template<typename F>
inline void list(F f, void *arg, char const *prefix, String::type *nameBuffer) noexcept

Calls a callback for every object in the longDirectory().

See also

stored::list()

template<typename F>
inline void list(F f, void *arg, char const *prefix = nullptr) noexcept

Calls a callback for every object in the longDirectory().

See also

stored::list()

inline uint8_t const *longDirectory() const noexcept

Retuns the long directory.

When not available, the short directory is returned.

inline ObjectMap map(char const *prefix = nullptr)

Create a name to Variant map for the store.

Generating the map may be expensive and the result is not cached.

inline char const *name() const noexcept

Returns the name of store, which can be used as prefix for stored::Debugger.

inline uint8_t const *shortDirectory() const noexcept

Returns the short directory.

template<typename T>
inline Variable<T, Implementation> variable(char const *name, size_t len = std::numeric_limits<size_t>::max()) noexcept

Finds a variable with the given name.

The variable, when it exists, must have the given (fixed) type.

Public Members

impl::StoreVariable<Base, Implementation, uint32_t, 12u, 4> actual

actual

impl::StoreVariable<Base, Implementation, int32_t, 0u, 4> override_obj

override

impl::StoreVariable<Base, Implementation, bool, 4u, 1> run

run

impl::StoreVariable<Base, Implementation, uint32_t, 8u, 4> setpoint

setpoint

Public Static Functions

template<typename T>
static inline constexpr FreeFunction<T, Implementation> freeFunction(char const *name, size_t len = std::numeric_limits<size_t>::max()) noexcept

Finds a function with the given name.

The function, when it exists, must have the given (fixed) type. It is returned as a free function; it is not bound yet to a specific store instance. This function is constexpr for C++14.

template<typename T>
static inline constexpr FreeVariable<T, Implementation> freeVariable(char const *name, size_t len = std::numeric_limits<size_t>::max()) noexcept

Finds a variable with the given name.

The variable, when it exists, must have the given (fixed) type. It is returned as a free variable; it is not bound yet to a specific store instance. This function is constexpr for C++14.

static inline constexpr char const *hash() noexcept

Returns a unique hash of the store.

Friends

friend class impl::StoreFunction
friend class impl::StoreVariable
friend class impl::StoreVariantF
friend class impl::StoreVariantV
friend class stored::FreeVariable
friend class stored::Variant< void >
class ControlStore : public stored::Synchronizable<stored::ExampleConcurrencyControlBase<ControlStore>>

Public Functions

ControlStore() = default