Skip to content

Adapter Interface / How To Get Events Into The System

There are two different methods for generating events for StreamMine3G: Using 1. the generate() method which is called on a regular basis based on a defined timer interval, 2. the adapter interface which allows users to establish a TCP connection to some external (web)service to receive events from this kind of data source.

In either case, an event can be generated using the Collector object. The generated event will then appear in the process() method of the same operator.


Generate Method

The generate method, as the name implies is used to generate events for StreamMine3G. This method should be used if you want to pull data from arbitrary data sources using some client libraries, e.g., libmysql in case you want to extract data from some MySQL database or you can also open a simple file stream (e.g. csv file) and read records from that kind of data source.

The timer interval for the up-calls of the generate method can be specified using the following parameters when defining/creating an operator:

Json file of your simple static manager:

1
2
3
4
"someOperator" : 
{
   "timerInterval" : 1000000
}

or via the following call if you have your own custom manager implementation:

1
operatorConfig->setParameter((char*)TIMERINTERVAL, (char*)"1000000"); // every second
OPTIONKEY: TIMERINTERVAL / timerInterval
DEFAULT is: 1000000 (every one second) / allowed values: 0-? ( 0 = continuously call generate)

Depending on how many processingTasksGenerator you defined - see performance tuning, the generate method will be called x-many times with every interval that passed, hence, you will see 8 calls of the generate method with every second if processingTasksGenerator is set to 8 and the timerInterval set to 1000000.


Adapter Interface

StreamMine3G provides an adapter interface which allows you to connect to some TCP server (or have external services connected to StreamMine3G) to exchange raw bytes streams. Using this interface, protocols such as HTTP can be easily implemented, or alternatively, messages can be exchanged via Google protocol buffers.

The provided interface is asynchronous, i.e., it is first necessary to issue a read request once you are connected to an external entity

Such as the following:

1
2
3
4
5
6
7
void MyOperator::onConnectAdapter(int sourceId)
{
    std::cout << "On connect from source " << sourceId << "\n";

    char* buf = new char[1];
    cloudControl->readAdapter(buf, 1, sourceId);
}

and, once data is ready, an up-call of the onReadAdapter() method will be called.

TCP connections to external entities can be either actively established via the collector->connectTo() method call, or by having StreamMine3G to bind a TCP server to a specific IP-Address and port for accepting incoming connections.

With either method used, once an connection has been established, the onConnectAdapter() method of your operator will be called to issue read or write request depending on the desired protocol.

The adapter interface allows to operate on multiple connections simultaneously, hence, StreamMine3G can also be used to implement arbitrary services. Each established connection is uniquely identified by the sourceId parameter. This parameter is also used when issuing read or write calls on the adapter interface.