Synchronizer

Distributed store synchronizer.

In a distributed system, every process has its own instance of a store. Synchronization between these instances is implemented by the stored::Synchronizer. The Synchronizer can be seen as a service, usually one per process, which knows all stores in that process and all communication channels to other processes. At regular intervals, it sends updates of locally modified data to the other Synchronizers.

The topology can be configured at will. In principle, a process can have any number of stores, any number of synchronizers (which all handle any subset of the stores), any number of connections to any other process in the system.

There are a few rules to keep in mind:

  • Only stored::Synchronizable stores can be handled by the Synchronizer. This has to be used correctly when the store is instantiated.

  • To synchronize a store, one must define which store is the one that provides the initial value. Upon connection between Synchronizers, the store’s content is synchronized at once from one party to the other. Afterwards, updates are sent in both directions.

  • Writes to different objects in the same store by the same process are observed by every other process in the same order. All other write orders are undefined (like writes to objects of different stores by the same process, or writes to the same store by different processes), and can be observed to happen in a different order by different processes at the same time.

  • Writes to one object should only be done by one process. So, every process owns a subset of a store. If multiple processes write to the same object, behavior is undefined. That would be a race-condition anyway.

  • The communication is done in the store’s endianness. If a distributed system have processors with different endianness, they should be configured to all-little or all-big endian. Accessing the store by the processor that has a store in a non-native endianness, might be a bit more expensive, but synchronization is cheaper.

  • Stores are identified by their (SHA-1) hash. This hash is computed over the full source code of the store (the .st file). So, only stores with the exact same definition, and therefore layout, can be synchronized.

Protocol

The protocol for synchronization consists of four messages. These are sent when appropriate, not in a request-response paradigm. There is no acknowledge. Invalid messages are just ignored.

Hello

“I would like to have the full state and future changes of the given store (by hash). All updates, send to me using this reference.”

(h | H) <hash> <id>

The hash is returned by the hash() function of the store, including the null-terminator. The id is arbitrary chosen by the Synchronizer, and is 16-bit in the store’s endianness (h indicates little endian, H is big).

Welcome

This is a response to a Hello.

“You are welcome. Here is the full buffer state, upon your request, of the store with given reference. Any updates to the store at your side, provide them to me with my reference.”

(w | W) <hello id> <welcome id> <buffer>

The hello id is the id as received in the hello message (by the other party). The welcome id is chosen by this Synchronizer, in the same manner.

Update

“Your store, with given reference, has changed. The changes are attached.”

(u | U) <id> <updates>

The updates are a sequence of the triplet: <key> <length> <data>. The key and length have the most significant bytes stripped, which would always be 0. All values are in the store’s endianness (u is little, U is big endian).

Proposal

The updates are a sequence defined as follows: <5 MSb key offset, 3 LSb length> <additional key bytes> <additional length bytes> <data>.

The key offset + 1 is the offset from the previous entry in the updates sequence. Updates are sent in strict ascending key offset. The initial key is -1. For example, if the previous key was 10 and the 5 MSb indicate 3, then the next key is 10 + 3 + 1, so 14. If the 5 MSb are all 1 (so 31), an additional key byte is added after the first byte (which may be 0). This value is added to the key offset. If that value is 255, another key byte is added, etc.

The 3 LSb bits of the first byte are decoded according to the following list:

  • 0: data length is 1

  • 1: data length is 2

  • 2: data length is 3

  • 3: data length is 4

  • 4: data length is 5

  • 5: data length is 6

  • 6: data length is 7, and an additional length byte follows (like the key offset)

  • 7: data length is 8

Using this scheme, when all variables change within the store, the overhead is always one byte per variable (plus additional length bytes, but this is uncommon and fixed for a given store). This is also the upper limit of the update message. If less variables change, the key offset may be larger, but the total size is always less.

The asymmetry of having 6 as indicator for additional length bytes is because this is an unlikely value (7 bytes data), and at least far less common than having 8 bytes of data.

The data is sent in the store’s endianness (u is little, U is big endian).

Bye

“I do not need any more updates of the given store (by hash, by id or all).”

(b | B) <hash>
(b | B) <id>
(b | B)

A bye using the id can be used to respond to another message that has an unknown id. Previous communication sessions remnants can be cleaned up in this way.

b indicates that the id is as little endian, B indicates big endian. For the other two variants, there is no difference in endianness, but both versions are defined for symmetry.

stored::StoreJournal

class StoreJournal

A record of all changes within a store.

Every variable in the store registers updates in the journal. The journal keeps an administration based on the key of the variable. Every change has a sequence number, which is kind of a time stamp. This sequence number can be used to check which objects has changed since some point in time.

The current sequence number (‘now’) is bumped upon an encode or decode, when there have been changes in between.

Internally, only the last bytes of the sequence number is stored (short seq). Therefore, there is a window (now-ShortSeqWindow .. now) of which a short seq can be converted back to a real seq. Changes that are older than the safe margin (now-SeqLowerMargin), are automatically shifted in time to stay within the window. This may lead to some false positives when determining which objects have changed since an old seq number. This is safe behavior, but slightly less efficient for encoding updates.

The administration is a binary tree, stored in a std::vector. Every node in the tree contains the maximum seq of any node below it, so a search like ‘find objects with a seq higher than x’ can terminate early. The vector must be regenerated when elements are inserted or removed. This is expensive, but usually only happens during the initial phase of the application.

A store has only one journal, via stored::Synchronizable. Multiple instances of stored::SyncConnection use the same journal.

Public Types

enum [anonymous]

Values:

enumerator ShortSeqWindow

Maximum offset of seq() that is a valid short seq.

enumerator SeqLowerMargin

Oldest margin where the short seq of changes should be moved.

enumerator SeqCleanThreshold

Threshold for clean().

using IterateChangedCallback = void(Key, void*)
typedef uint32_t Key

The key, as produced by a store.

The key of a store is size_t. Limit it to 32-bit, assuming that stores will not be bigger than 4G.

typedef uint64_t Seq

Timestamp of a change.

64-bit means that if it is bumped every ns, a wrap-around happens after 500 years.

typedef uint16_t ShortSeq

A short version of Seq, used in all administration.

This saves a lot of space, but limits handling timestamps to ShortSeqWindow.

typedef Key Size

The size of an object.

The 32-bit assumption is checked in the ctor.

Public Functions

StoreJournal(char const *hash, void *buffer, size_t size, StoreCallback *callback = nullptr)

Ctor.

Parameters:
~StoreJournal() = default
Seq bumpSeq()

Bump seq(), when required.

void changed(Key key, size_t len, bool insertIfNew = true)

Record a change.

Parameters:
  • key – of the object within the store this is the journal of

  • len – the length of the (changed) data of the object. This is usually constant, but may change if the object is a string, for example.

  • insertIfNew – when true, record the change, even if it is not in the journal yet

void clean(Seq oldest = 0)

Remove all elements from the administration older than the given threshold.

Parameters:

oldest – seq of oldest element to remain. If 0, use older than SeqCleanThreshold before seq().

Seq decodeBuffer(void *&buffer, size_t &len)

Decode and process a full store’s buffer from stored::Synchronizer message.

Returns:

the seq number of the applied changes

Seq decodeUpdates(void *&buffer, size_t &len, bool recordAll, void *scratch)

Decode and apply updates from a stored::Synchronizer message.

scratch should be at least be len + 3 of size. It may overlap with buffer.

Seq encodeBuffer(ProtocolLayer &p, bool last = false)

Encode full store’s buffer as a stored::Synchronizer message.

Returns:

the seq number of the change set

void encodeHash(ProtocolLayer &p, bool last = false) const

Encode the store’s hash into a Synchronizer message.

Seq encodeUpdates(uint8_t *&buf, Seq sinceSeq)

Encode all updates of changed objects since the given update sequence number (inclusive).

Returns:

The sequence number of the most recent update, to be passed to the next invocation of encodeUpdates().

bool hasChanged(Key key, Seq since) const

Checks if the given object has changed since the given sequence number.

The sequence number is the value returned by encodeUpdates().

bool hasChanged(Seq since) const

Checks if any object has changed since the given sequence number.

The sequence number is the value returned by encodeUpdates().

char const *hash() const

Return the hash of the corresponding store.

template<typename F>
inline iterateChanged(Seq since, F &&cb) const

Iterate all changes since the given seq.

The callback will receive the Key of the object that has changed since the given seq.

void iterateChanged(Seq since, IterateChangedCallback *cb, void *arg = nullptr) const

Iterate all changes since the given seq.

The callback will receive the Key of the object that has changed since the given seq, and the supplied arg.

void *keyToBuffer(Key key, Size len = 0, bool *ok = nullptr) const

Return the buffer of an object corresponding to the given key.

void reserveHeap(size_t storeVariableCount)

Pre-allocate memory for the given number of variables.

As long as the number of changed variables remains below this number, no heap operations are performed during synchronization.

Seq seq() const

Returns the current update sequence number.

This number indicates in which period objects have changed. Then, a SyncConnection can determine which objects have already and which have not yet been updated remotely.

Public Static Functions

static char const *decodeHash(void *&buffer, size_t &len)

Decode hash from a Synchronizer message.

Returns:

the hash, or an empty string upon decode error

static void encodeHash(ProtocolLayer &p, char const *hash, bool last = false)

Encode a hash into a Synchronizer message.

static uint8_t keySize(size_t bufferSize)

Compute key size in bytes given a buffer size.

class StoreCallback

Subclassed by stored::Synchronizable< Base >::TypedStoreCallback

Public Functions

StoreCallback() = default
virtual ~StoreCallback()
virtual bool doHookChanged() noexcept = 0
virtual bool doHookEntryRO() noexcept = 0
virtual bool doHookExitRO() noexcept = 0
virtual bool doHooks() noexcept = 0
virtual void hookChanged() noexcept = 0
virtual void hookChanged(Type::type, void *buffer, size_t len) noexcept = 0
virtual void hookEntryRO() noexcept = 0
virtual void hookEntryRO(Type::type type, void *buffer, size_t len) noexcept = 0
virtual void hookExitRO() noexcept = 0
virtual void hookExitRO(Type::type type, void *buffer, size_t len) noexcept = 0

stored::SyncConnection

class SyncConnection : public stored::ProtocolLayer

A one-to-one connection to synchronize one or more stores.

A SyncConnection is related to one stored::Synchronizer, and a protocol stack to one other party. Using this connection, multiple stores can be synchronized.

The protocol is straight-forward: assume Synchronizer A wants to synchronize a store with Synchronizer B via a SyncConnection:

  • A sends ‘Hello’ to B to indicate that it wants the full store immediately and updates afterwards.

  • B sends ‘Welcome’ back to A, including the full store’s buffer.

  • When A has updates, it sends ‘Update’ to B.

  • When B has updates, it sensd ‘Update’ to A.

  • If A does not need updates anymore, it sends ‘Bye’.

  • B can send ‘Bye’ to A too, but this will probably break the application, as A usually cannot handle this.

Public Types

typedef ProtocolLayer base
typedef uint16_t Id

Public Functions

SyncConnection(Synchronizer &synchronizer, ProtocolLayer &connection)

Ctor.

Parameters:
  • synchronizer – the Synchronizer that manages this connection

  • connection – the protocol stack that is to wrap this SyncConnection

virtual ~SyncConnection() override

Dtor.

virtual void decode(void *buffer, size_t len) final override

Decode a frame and forward the decoded frame to the upper layer.

The given buffer may be decoded in-place.

void drop(StoreJournal &store)

Deregister a given store.

bool isSynchronizing(StoreJournal &store) const

Returns if the given store is synchronized over this connection.

StoreJournal::Seq process(StoreJournal &store, void *encodeBuffer)

Send out all updates of the given store.

The provided encodeBuffer must point to a scratch pad memory, large enough for the store’s MaxMessageSize.

virtual void reset() override

Reset the stack (top-down), and drop all messages.

void source(StoreJournal &store)

Use this connection as a source of the given store.

Synchronizer &synchronizer() const

Returns the Synchronizer that manages this connection.

Public Static Attributes

static char const Bye = 'b'
static char const Hello = 'h'
static char const Update = 'u'
static char const Welcome = 'w'

stored::Synchronizable

template<typename Base>
class Synchronizable : public Base

An extension of a store to be used by the stored::Synchronizer.

Assume you have MyStoreBase (which takes a template parameter Implementation), and the actual store implementation ActualStore. If it does not have to be synchronizable, you would inherit ActualStore from MyStoreBase<ActualStore>.

If it should be synchronizable, use Synchronizable like this:

class ActualStore : public stored::Synchronizable<stored::MyStoreBase<ActualStore> > {
    STORED_CLASS_NOCOPY(ActualStore)
public:
    typedef stored::Synchronizable<stored::MyStoreBase<ActualStore> > base;
    using typename base::Implementation;
    friend class stored::MyStoreBase<ActualStore>;
    ActualStore() is_default
    ...
};

Or with a few macros:

class ActualStore: public STORE_SYNC_BASE_CLASS(MyStoreBase, ActualStore) {
    STORE_SYNC_CLASS_BODY(MyStoreBase, ActualStore)
public:
    ActualStore() is_default
};

Now, ActualStore inherits from Synchronizable, which inherits from MyStoreBase. However, ActualStore is still used as the final implementation.

Subclassed by stored::ExampleFpga2DefaultFunctions< stored::Synchronizable< stored::ExampleFpga2Base< ExampleFpga2 > > >, stored::ExampleFpgaDefaultFunctions< stored::Synchronizable< stored::ExampleFpgaBase< ExampleFpga > > >

Public Types

enum [anonymous]

Values:

enumerator MaxMessageSize

Maximum size of any Synchronizer message for this store.

typedef base::Objects Objects

Public Functions

template<typename ...Args>
inline explicit Synchronizable(Args&&... args)
~Synchronizable() = default
inline StoreJournal &journal()
inline StoreJournal const &journal() const
inline operator StoreJournal&()
inline operator StoreJournal const&() const
inline void reserveHeap()

Reserve worst-case heap usage.

Afterwards, the store and synchronizer will not use any additional heap, which makes it possible to use it in a not-async-signal-safe context, like an interrupt handler.

Friends

friend class TypedStoreCallback
class TypedStoreCallback : public stored::StoreJournal::StoreCallback

Public Functions

inline explicit TypedStoreCallback(Synchronizable &store)
~TypedStoreCallback() override = default
inline virtual bool doHookChanged() noexcept override
inline virtual bool doHookEntryRO() noexcept override
inline virtual bool doHookExitRO() noexcept override
inline virtual bool doHooks() noexcept override
inline virtual void hookChanged() noexcept override
inline virtual void hookChanged(Type::type type, void *buffer, size_t len) noexcept override
inline virtual void hookEntryRO() noexcept override
inline virtual void hookEntryRO(Type::type type, void *buffer, size_t len) noexcept override
inline virtual void hookExitRO() noexcept override
inline virtual void hookExitRO(Type::type type, void *buffer, size_t len) noexcept override

stored::Synchronizer

class Synchronizer

The service that manages synchronization of stores over SyncConnections.

A Synchronizer holds a set of stores, and a set of SyncConnections. A store can be synchronized over multiple connections simultaneously.

Public Functions

Synchronizer() = default

Ctor.

~Synchronizer()

Dtor.

SyncConnection const &connect(ProtocolLayer &connection)

Connect the given connection to this Synchronizer.

A stored::SyncConnection is instantiated on top of the given protocol stack. This SyncConnection is the OSI Application layer of the synchronization prococol.

Returns:

the stored::SyncConnection, which is valid until disconnect() is called

void disconnect(ProtocolLayer &connection)

Disconnect the given connection from this Synchronizer.

Provide the connection as given to connect() before.

inline void *encodeBuffer()

Return a buffer large enough to encode messages in.

bool isSynchronizing(StoreJournal &j) const
bool isSynchronizing(StoreJournal &j, SyncConnection &notOverConnection) const
template<typename Store>
inline void map(Synchronizable<Store> &store)

Register a store in this Synchronizer.

void process()

Process updates for all connections and all stores.

void process(ProtocolLayer &connection)

Process updates for all stores on the given connection.

StoreJournal::Seq process(ProtocolLayer &connection, StoreJournal &j)

Process updates for the given store on the given connection.

template<typename Store>
inline StoreJournal::Seq process(ProtocolLayer &connection, Synchronizable<Store> &store)

Process updates for the given store on the given connection.

void process(StoreJournal &j)

Process updates for the given journal on all connections.

template<typename Store>
inline void process(Synchronizable<Store> &store)

Process updates for the given store on all connections.

template<typename Store>
inline void syncFrom(Synchronizable<Store> &store, ProtocolLayer &connection)

Mark the connection to be a source of the given store.

The full store’s buffer is received from the Synchronizer via the connection. Afterwards, updates and exchanged bidirectionally.

StoreJournal *toJournal(char const *hash) const

Find a registered store given an hash.

Returns:

the store, or nullptr when not found

template<typename Store>
inline void unmap(Synchronizable<Store> &store)

Deregister a store from this Synchronizer.