Skip to content

Stateful Operators

StreamMine3G supports the implementation of stateful operators such as joins, aggregates etc., this includes consistent state access, hence, the user does not need to implement his own locking scheme as well as state persistence. StreamMine3G performs periodic checkpointing so that the (accumulated) state of an operator is not lost during node crashes (see fault tolerance) as well as assuring consistency when slices are migrated to other StreamMine3G nodes.

State Access & Partitioning

In addition to partitioning of operators, StreamMine3G supports also state partitioning. This is not to mix with operator partitioning: An operator can be partitioned into n-slices, where each slice has its own state. However, the state of a slice can be partitioned as well.

Why state partitioning?

Whenever a stateful operator receives an event, the state must be locked in order to ensure consistency, i.e., prevent concurrent modifications of the state's internal data structures. However, when locking state, no other event that needs access to the same state can be processed in parallel resulting in a sequential processing of events. Generally this is not a problem as more than one slice is usually deployed on a node, hence utilizing all cores of the host system. However, in cases where only one slice of a stateful operator is deployed on a multi-core system, this will result in single core processing.[[BR]] To overcome this problem, the operator state of a slice can be partitioned as well. In the following, we will call a partition of the state a bucket. The number of buckets can be determined by the STATEACCESSBUCKETS parameter.

Similar to the partitioner used to decide to which slice (through sliceId) events should be sent to, there is also a "state routing", i.e., stateAccess() method that is called shortly prior the process() method upcall in order to decide which state bucket should be used for the incoming event. State buckets have Ids starting from 0... hence, the stateAccess->setKey(x) call in the stateAccess() specifies which state bucket to use in the subsequent process() call. If the operator should not use any state, simply pass -1 to setKey.

1
void stateAccess(int routingKey, void* event, int size, StateAccess* stateAccess);

In addition to specifying the Id of the state bucket to use, the lock type can specified as well. The state can be locked either via a mutex-, a spin- or a readWrite-lock.

Since the state is partitioned in buckets, the stateInit() and freeState() method will be called as many times as buckets for the operator had been specified.

State (De-)serialization

State persistence as well as slice migration require the serialization and de-serialization of the state's internal data structure, hence, the serializeState() as well as deserializeState() methods must be implemented when using stateful operators in StreamMine3G.

The methods are called whenever a slice migration is occurring within the StreamMine3G cluster or the state is read/written from/to stable storage. In case the data structure exists already in a serialized form, i.a., as a flat memory structure, state persistence can be achieved with zero memory copying, i.e., just by returning a list of pointers to those memory regions including its lengths. In all other cases, any serialization framework can be used such as boost serialization to transform the complex memory structures into a flat ones and vice versa. In the latter case, temporarily acquired buffers for serialization can be free'ed in the onReleaseBuffer() method of the operator.