Synchronizer
Distributed store synchronizer.
In a distributed system, every process has its own instance of a store.
Synchronization between these instances is implemented by the
stored::Synchronizer
. The Synchronizer can be seen as a service,
usually one per process, which knows all stores in that process and all
communication channels to other processes. At regular intervals, it sends
updates of locally modified data to the other Synchronizers.
The topology can be configured at will. In principle, a process can have any number of stores, any number of synchronizers (which all handle any subset of the stores), any number of connections to any other process in the system.
There are a few rules to keep in mind:
Only
stored::Synchronizable
stores can be handled by the Synchronizer. This has to be used correctly when the store is instantiated.To synchronize a store, one must define which store is the one that provides the initial value. Upon connection between Synchronizers, the store’s content is synchronized at once from one party to the other. Afterwards, updates are sent in both directions.
Writes to different objects in the same store by the same process are observed by every other process in the same order. All other write orders are undefined (like writes to objects of different stores by the same process, or writes to the same store by different processes), and can be observed to happen in a different order by different processes at the same time.
Writes to one object should only be done by one process. So, every process owns a subset of a store. If multiple processes write to the same object, behavior is undefined. That would be a race-condition anyway.
The communication is done in the store’s endianness. If a distributed system have processors with different endianness, they should be configured to all-little or all-big endian. Accessing the store by the processor that has a store in a non-native endianness, might be a bit more expensive, but synchronization is cheaper.
Stores are identified by their (SHA-1) hash. This hash is computed over the full source code of the store (the .st file). So, only stores with the exact same definition, and therefore layout, can be synchronized.
Protocol
The protocol for synchronization consists of four messages. These are sent when appropriate, not in a request-response paradigm. There is no acknowledge. Invalid messages are just ignored.
Hello
“I would like to have the full state and future changes of the given store (by hash). All updates, send to me using this reference.”
(h
| H
) <hash> <id>
The hash is returned by the hash()
function of the store, including the
null-terminator. The id is arbitrary chosen by the Synchronizer, and is 16-bit
in the store’s endianness (h
indicates little endian, H
is big).
Welcome
This is a response to a Hello.
“You are welcome. Here is the full buffer state, upon your request, of the store with given reference. Any updates to the store at your side, provide them to me with my reference.”
(w
| W
) <hello id> <welcome id> <buffer>
The hello id is the id as received in the hello message (by the other party). The welcome id is chosen by this Synchronizer, in the same manner.
Update
“Your store, with given reference, has changed. The changes are attached.”
(u
| U
) <id> <updates>
The updates are a sequence of the triplet: <key> <length> <data>. The key and
length have the most significant bytes stripped, which would always be 0. All
values are in the store’s endianness (u
is little, U
is big endian).
Proposal
The updates are a sequence defined as follows: <5 MSb key offset, 3 LSb length> <additional key bytes> <additional length bytes> <data>.
The key offset + 1 is the offset from the previous entry in the updates sequence. Updates are sent in strict ascending key offset. The initial key is -1. For example, if the previous key was 10 and the 5 MSb indicate 3, then the next key is 10 + 3 + 1, so 14. If the 5 MSb are all 1 (so 31), an additional key byte is added after the first byte (which may be 0). This value is added to the key offset. If that value is 255, another key byte is added, etc.
The 3 LSb bits of the first byte are decoded according to the following list:
0: data length is 1
1: data length is 2
2: data length is 3
3: data length is 4
4: data length is 5
5: data length is 6
6: data length is 7, and an additional length byte follows (like the key offset)
7: data length is 8
Using this scheme, when all variables change within the store, the overhead is always one byte per variable (plus additional length bytes, but this is uncommon and fixed for a given store). This is also the upper limit of the update message. If less variables change, the key offset may be larger, but the total size is always less.
The asymmetry of having 6 as indicator for additional length bytes is because this is an unlikely value (7 bytes data), and at least far less common than having 8 bytes of data.
The data is sent in the store’s endianness (u
is little, U
is big
endian).
Bye
“I do not need any more updates of the given store (by hash, by id or all).”
b
| B
) <hash>b
| B
) <id>b
| B
)A bye using the id can be used to respond to another message that has an unknown id. Previous communication sessions remnants can be cleaned up in this way.
b
indicates that the id is as little endian, B
indicates big endian.
For the other two variants, there is no difference in endianness, but both
versions are defined for symmetry.
stored::StoreJournal
-
class StoreJournal
A record of all changes within a store.
Every variable in the store registers updates in the journal. The journal keeps an administration based on the key of the variable. Every change has a sequence number, which is kind of a time stamp. This sequence number can be used to check which objects has changed since some point in time.
The current sequence number (‘now’) is bumped upon an encode or decode, when there have been changes in between.
Internally, only the last bytes of the sequence number is stored (short seq). Therefore, there is a window (now-ShortSeqWindow .. now) of which a short seq can be converted back to a real seq. Changes that are older than the safe margin (now-SeqLowerMargin), are automatically shifted in time to stay within the window. This may lead to some false positives when determining which objects have changed since an old seq number. This is safe behavior, but slightly less efficient for encoding updates.
The administration is a binary tree, stored in a
std::vector
. Every node in the tree contains the maximum seq of any node below it, so a search like ‘find objects with a seq higher than x’ can terminate early. The vector must be regenerated when elements are inserted or removed. This is expensive, but usually only happens during the initial phase of the application.A store has only one journal, via stored::Synchronizable. Multiple instances of stored::SyncConnection use the same journal.
See also
Public Types
-
enum [anonymous]
Values:
-
enumerator SeqLowerMargin
Oldest margin where the short seq of changes should be moved.
-
enumerator SeqLowerMargin
-
typedef uint32_t Key
The key, as produced by a store.
The key of a store is
size_t
. Limit it to 32-bit, assuming that stores will not be bigger than 4G.
-
typedef uint64_t Seq
Timestamp of a change.
64-bit means that if it is bumped every ns, a wrap-around happens after 500 years.
-
typedef uint16_t ShortSeq
A short version of Seq, used in all administration.
This saves a lot of space, but limits handling timestamps to ShortSeqWindow.
Public Functions
-
StoreJournal(char const *hash, void *buffer, size_t size, StoreCallback *callback = nullptr)
Ctor.
- Parameters:
hash – the hash of the store
buffer – the buffer of the store
size – the size of
buffer
callback – the stored::Synchronizable::TypedStoreCallback instance
-
~StoreJournal() = default
-
void changed(Key key, size_t len, bool insertIfNew = true)
Record a change.
- Parameters:
key – of the object within the store this is the journal of
len – the length of the (changed) data of the object. This is usually constant, but may change if the object is a string, for example.
insertIfNew – when
true
, record the change, even if it is not in the journal yet
-
void clean(Seq oldest = 0)
Remove all elements from the administration older than the given threshold.
- Parameters:
oldest – seq of oldest element to remain. If 0, use older than SeqCleanThreshold before seq().
-
Seq decodeBuffer(void *&buffer, size_t &len)
Decode and process a full store’s buffer from stored::Synchronizer message.
- Returns:
the seq number of the applied changes
-
Seq decodeUpdates(void *&buffer, size_t &len, bool recordAll, void *scratch)
Decode and apply updates from a stored::Synchronizer message.
scratch
should be at least belen
+ 3 of size. It may overlap withbuffer
.
-
Seq encodeBuffer(ProtocolLayer &p, bool last = false)
Encode full store’s buffer as a stored::Synchronizer message.
- Returns:
the seq number of the change set
-
void encodeHash(ProtocolLayer &p, bool last = false) const
Encode the store’s hash into a Synchronizer message.
-
Seq encodeUpdates(uint8_t *&buf, Seq sinceSeq)
Encode all updates of changed objects since the given update sequence number (inclusive).
- Returns:
The sequence number of the most recent update, to be passed to the next invocation of
encodeUpdates()
.
-
bool hasChanged(Key key, Seq since) const
Checks if the given object has changed since the given sequence number.
The sequence number is the value returned by encodeUpdates().
-
bool hasChanged(Seq since) const
Checks if any object has changed since the given sequence number.
The sequence number is the value returned by encodeUpdates().
-
char const *hash() const
Return the hash of the corresponding store.
-
template<typename F>
inline iterateChanged(Seq since, F &&cb) const Iterate all changes since the given seq.
The callback
will
receive the Key of the object that has changed since the given seq.
-
void iterateChanged(Seq since, IterateChangedCallback *cb, void *arg = nullptr) const
Iterate all changes since the given seq.
The callback
will
receive the Key of the object that has changed since the given seq, and the suppliedarg
.
-
void *keyToBuffer(Key key, Size len = 0, bool *ok = nullptr) const
Return the buffer of an object corresponding to the given key.
-
void reserveHeap(size_t storeVariableCount)
Pre-allocate memory for the given number of variables.
As long as the number of changed variables remains below this number, no heap operations are performed during synchronization.
-
Seq seq() const
Returns the current update sequence number.
This number indicates in which period objects have changed. Then, a SyncConnection can determine which objects have already and which have not yet been updated remotely.
Public Static Functions
-
static char const *decodeHash(void *&buffer, size_t &len)
Decode hash from a Synchronizer message.
- Returns:
the hash, or an empty string upon decode error
-
static void encodeHash(ProtocolLayer &p, char const *hash, bool last = false)
Encode a hash into a Synchronizer message.
-
static uint8_t keySize(size_t bufferSize)
Compute key size in bytes given a buffer size.
-
class StoreCallback
Subclassed by stored::Synchronizable< Base >::TypedStoreCallback
Public Functions
-
StoreCallback() = default
-
virtual ~StoreCallback()
-
virtual bool doHookChanged() noexcept = 0
-
virtual bool doHookEntryRO() noexcept = 0
-
virtual bool doHookExitRO() noexcept = 0
-
virtual bool doHooks() noexcept = 0
-
virtual void hookChanged() noexcept = 0
-
virtual void hookChanged(Type::type, void *buffer, size_t len) noexcept = 0
-
virtual void hookEntryRO() noexcept = 0
-
virtual void hookEntryRO(Type::type type, void *buffer, size_t len) noexcept = 0
-
virtual void hookExitRO() noexcept = 0
-
virtual void hookExitRO(Type::type type, void *buffer, size_t len) noexcept = 0
-
StoreCallback() = default
-
enum [anonymous]
stored::SyncConnection
-
class SyncConnection : public stored::ProtocolLayer
A one-to-one connection to synchronize one or more stores.
A SyncConnection is related to one stored::Synchronizer, and a protocol stack to one other party. Using this connection, multiple stores can be synchronized.
The protocol is straight-forward: assume Synchronizer A wants to synchronize a store with Synchronizer B via a SyncConnection:
A sends ‘Hello’ to B to indicate that it wants the full store immediately and updates afterwards.
B sends ‘Welcome’ back to A, including the full store’s buffer.
When A has updates, it sends ‘Update’ to B.
When B has updates, it sensd ‘Update’ to A.
If A does not need updates anymore, it sends ‘Bye’.
B can send ‘Bye’ to A too, but this will probably break the application, as A usually cannot handle this.
See also
Public Functions
-
SyncConnection(Synchronizer &synchronizer, ProtocolLayer &connection)
Ctor.
- Parameters:
synchronizer – the Synchronizer that manages this connection
connection – the protocol stack that is to wrap this SyncConnection
-
virtual ~SyncConnection() override
Dtor.
-
virtual void decode(void *buffer, size_t len) final override
Decode a frame and forward the decoded frame to the upper layer.
The given buffer may be decoded in-place.
-
void drop(StoreJournal &store)
Deregister a given store.
-
bool isSynchronizing(StoreJournal &store) const
Returns if the given store is synchronized over this connection.
-
StoreJournal::Seq process(StoreJournal &store, void *encodeBuffer)
Send out all updates of the given store.
The provided
encodeBuffer
must point to a scratch pad memory, large enough for the store’s MaxMessageSize.
-
virtual void reset() override
Reset the stack (top-down), and drop all messages.
-
void source(StoreJournal &store)
Use this connection as a source of the given store.
-
Synchronizer &synchronizer() const
Returns the Synchronizer that manages this connection.
stored::Synchronizable
-
template<typename Base>
class Synchronizable : public Base An extension of a store to be used by the stored::Synchronizer.
Assume you have
MyStoreBase
(which takes a template parameterImplementation
), and the actual store implementationActualStore
. If it does not have to be synchronizable, you would inheritActualStore
fromMyStoreBase<ActualStore>
.If it should be synchronizable, use Synchronizable like this:
class ActualStore : public stored::Synchronizable<stored::MyStoreBase<ActualStore> > { STORED_CLASS_NOCOPY(ActualStore) public: typedef stored::Synchronizable<stored::MyStoreBase<ActualStore> > base; using typename base::Implementation; friend class stored::MyStoreBase<ActualStore>; ActualStore() is_default ... };
Or with a few macros:
class ActualStore: public STORE_SYNC_BASE_CLASS(MyStoreBase, ActualStore) { STORE_SYNC_CLASS_BODY(MyStoreBase, ActualStore) public: ActualStore() is_default };
Now,
ActualStore
inherits fromSynchronizable
, which inherits fromMyStoreBase
. However,ActualStore
is still used as the final implementation.Subclassed by stored::ExampleFpga2DefaultFunctions< stored::Synchronizable< stored::ExampleFpga2Base< ExampleFpga2 > > >, stored::ExampleFpgaDefaultFunctions< stored::Synchronizable< stored::ExampleFpgaBase< ExampleFpga > > >
Public Types
-
enum [anonymous]
Values:
-
enumerator MaxMessageSize
Maximum size of any Synchronizer message for this store.
-
enumerator MaxMessageSize
-
typedef base::Objects Objects
Public Functions
-
~Synchronizable() = default
-
inline StoreJournal &journal()
-
inline StoreJournal const &journal() const
-
inline operator StoreJournal&()
-
inline operator StoreJournal const&() const
-
inline void reserveHeap()
Reserve worst-case heap usage.
Afterwards, the store and synchronizer will not use any additional heap, which makes it possible to use it in a not-async-signal-safe context, like an interrupt handler.
Friends
- friend class TypedStoreCallback
-
class TypedStoreCallback : public stored::StoreJournal::StoreCallback
Public Functions
-
inline explicit TypedStoreCallback(Synchronizable &store)
-
~TypedStoreCallback() override = default
-
inline virtual bool doHookChanged() noexcept override
-
inline virtual bool doHookEntryRO() noexcept override
-
inline virtual bool doHookExitRO() noexcept override
-
inline virtual bool doHooks() noexcept override
-
inline virtual void hookChanged() noexcept override
-
inline virtual void hookChanged(Type::type type, void *buffer, size_t len) noexcept override
-
inline virtual void hookEntryRO() noexcept override
-
inline virtual void hookEntryRO(Type::type type, void *buffer, size_t len) noexcept override
-
inline virtual void hookExitRO() noexcept override
-
inline virtual void hookExitRO(Type::type type, void *buffer, size_t len) noexcept override
-
inline explicit TypedStoreCallback(Synchronizable &store)
-
enum [anonymous]
stored::Synchronizer
-
class Synchronizer
The service that manages synchronization of stores over SyncConnections.
A Synchronizer holds a set of stores, and a set of SyncConnections. A store can be synchronized over multiple connections simultaneously.
Public Functions
-
Synchronizer() = default
Ctor.
-
~Synchronizer()
Dtor.
-
SyncConnection const &connect(ProtocolLayer &connection)
Connect the given connection to this Synchronizer.
A stored::SyncConnection is instantiated on top of the given protocol stack. This SyncConnection is the OSI Application layer of the synchronization prococol.
- Returns:
the stored::SyncConnection, which is valid until disconnect() is called
-
void disconnect(ProtocolLayer &connection)
Disconnect the given connection from this Synchronizer.
Provide the connection as given to connect() before.
-
inline void *encodeBuffer()
Return a buffer large enough to encode messages in.
-
bool isSynchronizing(StoreJournal &j) const
-
bool isSynchronizing(StoreJournal &j, SyncConnection ¬OverConnection) const
-
template<typename Store>
inline void map(Synchronizable<Store> &store) Register a store in this Synchronizer.
-
void process()
Process updates for all connections and all stores.
-
void process(ProtocolLayer &connection)
Process updates for all stores on the given connection.
-
StoreJournal::Seq process(ProtocolLayer &connection, StoreJournal &j)
Process updates for the given store on the given connection.
-
template<typename Store>
inline StoreJournal::Seq process(ProtocolLayer &connection, Synchronizable<Store> &store) Process updates for the given store on the given connection.
-
void process(StoreJournal &j)
Process updates for the given journal on all connections.
-
template<typename Store>
inline void process(Synchronizable<Store> &store) Process updates for the given store on all connections.
-
template<typename Store>
inline void syncFrom(Synchronizable<Store> &store, ProtocolLayer &connection) Mark the connection to be a source of the given store.
The full store’s buffer is received from the Synchronizer via the
connection
. Afterwards, updates and exchanged bidirectionally.
-
StoreJournal *toJournal(char const *hash) const
Find a registered store given an hash.
- Returns:
the store, or
nullptr
when not found
-
template<typename Store>
inline void unmap(Synchronizable<Store> &store) Deregister a store from this Synchronizer.
-
Synchronizer() = default