Skip to content

Operator Interface

The central point when doing event processing in StreamMine3G is the operator class with its process() method. Every event that arrives from some upstream operator or through the adapter interface will be provided to the operator as a parameter accompanied with a Collector object used to emit events to some downstream operator.

Event Consumption

The event itself is passed via void pointer* accompanied by its size and the routing key that was used for the routing to the appropriate slice (see partitioner interface).

In addition to the event, a void pointer to the state* is provided if the operator is a stateful operator otherwise the pointer is simply set to NULL - read here for more information about on how using stateful operators in StreamMine3G.

You may not modify incoming events nor deleting them. StreamMine3G takes care about garbage collection of those events.

Event Dissemination / Collector Object

In order to emit an event, i.e., sending it to a downstream operator, simply use the allocEvent() or emitEvent() method of the Collector object. The first method returns a pointer to an allocated memory region for the newly created event which can be freely modified. The flush() / commit() methods indicates StreamMine3G that the modification of the event has been completed, hence, the event can be committed, i.e., sent downstream to the next operator. The emitEvent() can be used to simply forward incoming events, or if portions of the state should be sent downstream, i.e., user allocated memory. With each emitEvent() call, an up-call of the onReleaseBuffer() method will be triggered notifying the operator that the event has been sent downstream and that the event buffer can be free'ed now if it was a user allocated memory.

Operator Instantiation

Upon instantiation of the operator class, the init() method with a sliceId, sliceUId as well as an optional custom parameter is called. The sliceId is important to keep for the adapter interface, so that generated events are correctly passed to process() method of that slice. The optional parameter can be used to pass arbitrary data to it when creating the operator in the manager component. This allows for instance creating a generic filter operator where filtering predicates are passed to as parameter.