Skip to content

WordCount Example

The following page will describe how to implement the popular WordCount application on StreamMine3G.

WordCount itself consists basically just of two operators, a mapper, that tokenizes the words in a text file/line/sentence and a reducer which counts all the words and hence provides a (continuously updating) distribution of occurrences of words.


Event Format

Events in StreamMine3G make up a simple key-value pair, where the key (an integer) can be used for partitioning and a payload for the actual data. The payload does not have any fixed format which gives users a maximum amount of flexibility when writing applications for StreamMine3G. Users are just provided with a void pointer* accompanied with its size for the payload when they receive or emit events. Hence, users can use their favorite serialization framework such as Google Protocol Buffers or Thrift for event representation. Since we are only using strings in this example, we can just use the void* pointer as it is for now which also greatly simplyfies the example.


Process Method

One central point when doing event processing in StreamMine3G is the process() method. The process method is similar to the map or reduce method in MapReduce, hence, once an event arrives from some upstream operator, that event will be provided to the operator as a parameter during an upcall of the process method, accompanied with a Collector object used to emit events to some downstream operator.

In addition to the routingKey, the event itself (void* pointer plus size) and the Collector object, the operator can also access state via the given void* pointer. This is useful when implementing stateful operators as we will see in the implementation of the reducer.

In the following, we will walk through the WordCount application code starting with the procces method of the mapper.

Mapper

In the process method of the mapper, we simply take the incoming event (sentence) and provide it to the strtok C++ method which splits up the sentence by spaces, hence, we can use a loop where we receive a single word with each iteration. During the iteration, we allocate/create a new event we want to have sent downstream, copy the extracted word to the memory of the newly created event and send it downstream using the commit() method.

WordCountMapper.cpp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
void WordCountMapper::process(int routingKey, void* sentence, int size, void* state, Collector* collector)
{
    char* word = new char[1024];
    strcpy(word, (char*)eventIn);
    word = strtok(word, " ");
    while (word != NULL)
    {
        char* eventOut = (char*)collector->allocEvent(0, 0, strlen(word)+1);
        memcpy(eventOut, word, strlen(word));
        eventOut[strlen(word)] = '\0';
        collector->commit();
        word = strtok(NULL, " "); 
    }
}

Reducer

On the reducer side, we use a stateful operator, where the state simply makes up a hash map. With each arriving event where the payload comprises the word itself, we simply increment the entry in our map.

WordCountReducer.cpp

1
2
3
4
5
6
void WordCountReducer::process(int routingKey, void* word, int size, void* state, Collector* collector)
{
    typedef std::map<std::string, int> MAP;
    MAP* wordCountMap = (MAP*)state;
    (*wordCountMap)[(char*)word]++;
}

How To Get Events Into The System?

One question, remains: How to get events into the system after all? StreamMine3G has an adapter interface to communicate via TCP/IP with external services which allows to transform events coming from external world into StreamMine3G events, as well as a generate() method which is called on a periodic basis to generate() events artificially. In this example, we are simply using the generate() method to generate random sentences for the Mapper.


Deploying And Running WordCount On StreamMine3G

To run our sample application on a StreamMine3G cluster, we simply use a prewritten manager implementation (libStreamMine3G-SimpleManagerStatic) which takes a topology defined in a json file and does the deployment of slices for us. The json-configuration file looks like this:

simpleManagerStaticConfig.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
    "startupDelay" : 1,
    "minimumPeers" : 2,
    "runtimeLimit" : 25,
    "operators" : 
    {
        "wordCountMapper" : 
        {
            "libraryPath"   : "../build/libWordCountMapper.so",
            "timerInterval" : 1000000,
            "processingTasksGenerator" : 1,
            "processingTasksProcessor" : 1,
            "processingBatchSize"      : 1
        },
        "wordCountReducer" : 
        {
            "libraryPath" : "../build/libWordCountReducer.so",
            "processingTasksProcessor" : 1,
            "processingBatchSize"      : 1
        }
    },
    "topologies" :
    {
        "myTopology" :
        {
            "wordCountMapper" : ["wordCountReducer"]
        }
    }
}

Running The Example

  1. Download & install zookeeper
  2. Download & install StreamMine3G
  3. Download & install StreamMine3G's SimpleStaticManager
  4. Download the example source code from here.
  5. Run the example:
    tar -xf streammine3g-wordcount-example.tgz
    cd StreamMine3G-WordCount-Example
    mkdir build
    cd build
    cmake ..
    make
    cd ../run
    ./start.sh
    

Some Final Thoughts For Experienced Users

The above example runs only single threaded - to run it multi-threaded, one would need to make strtok() thread safe as it has a state variable. See stateful operators.