WordCount Example¶
The following page will describe how to implement the popular WordCount application on StreamMine3G using the JavaWrapper.
WordCount itself consists basically just of two operators, a mapper, that tokenizes the words in a text file/line/sentence and a reducer which counts all the words and hence provides a (continuously updating) distribution of occurrences of words.
Event Format¶
Events in StreamMine3G make up a simple key-value pair, where the key (an integer) can be used for partitioning and a payload for the actual data. The payload does not have any fixed format which gives users a maximum amount of flexibility when writing applications for StreamMine3G. Users are just provided with a byte array for the payload when they receive or emit events. Hence, users can use their favorite serialization framework such as Java Serialization, Google Protocol Buffers or Thrift for event representation. Since we are only using strings in this example, we can just use the byte array which we interpret as String for now which also greatly simplyfies the example.
Process Method¶
One central point when doing event processing in StreamMine3G is the process()
method. The process method is similar to the map or reduce method in MapReduce, hence, once an event arrives from some upstream operator, that event will be provided to the operator as a parameter during an upcall of the process method, accompanied with a Collector
object used to emit events to some downstream operator.
In addition to the routingKey, the event itself (byte array) and the Collector
object, the operator can also access state via the passed state object. This is useful when implementing stateful operators as we will see in the implementation of the reducer.
In the following, we will walk through the WordCount application code starting with the procces method of the mapper.
Mapper¶
In the process method of the mapper, we simply take the incoming event (sentence) and provide it to the Java StringTokenizer which splits up the sentence by spaces, hence, we can use a loop where we receive a single word with each iteration. During the iteration, we allocate/create a new event we want to have sent downstream and use the emitEvent() method to sent the newly created event downstream.
WordCountMapper.java
1 2 3 4 5 6 7 8 9 10 | public void process(int routingKey, byte[] event, Object state, Collector collector) { String sentence = new String(event); StringTokenizer st = new StringTokenizer(sentence); while (st.hasMoreElements()) { String word = st.nextToken(); collector.emitEvent(0, 0, word.getBytes()); } } |
Reducer¶
On the reducer side, we use a stateful operator, where the state simply makes up a hash map. With each arriving event where the payload comprises the word itself, we simply increment the entry in our map.
WordCountReducer.cpp
1 2 3 4 5 6 7 8 9 | public void process(int routingKey, byte[] event, Object state, Collector collector) { String word = new String(event); Integer count = new Integer(1); Map<String, Integer> map = (Map<String, Integer>)state; if (map.containsKey(word)) count = new Integer(map.get(word).intValue()+1); map.put(word, count); } |
How To Get Events Into The System?¶
One question, remains: How to get events into the system after all? StreamMine3G has an adapter interface to communicate via TCP/IP with external services which allows to transform events coming from external world into StreamMine3G events, as well as a generate()
method which is called on a periodic basis to generate()
events artificially.
In this example, we are simply using the generate()
method to generate random sentences for the Mapper.
Deploying And Running WordCount On StreamMine3G¶
To run our sample application on a StreamMine3G cluster, we simply use a prewritten manager implementation (libStreamMine3G-SimpleManagerStatic) which takes a topology defined in a json file and does the deployment of slices for us. The json-configuration file looks like this:
simpleManagerStaticConfig.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | { "startupDelay" : 1, "minimumPeers" : 2, "runtimeLimit" : 10, "operators" : { "wordCountMapper" : { "libraryPath" : "libStreamMine3G-JavaOperatorWrapper.so", "parameters" : "-Djava.library.path=/usr/lib:/usr/local/lib -Djava.class.path=../lib/streammine3G-interface-1.5.1.jar -Dstreammine3G.jars=WordCountExample.md.jar:../lib/log4j-1.2.17.jar:../lib/commons-lang-2.1.jar -Dstreammine3G.class=wordcount.WordCountMapper -Dlog4j.configuration=file:./log4j.properties", "timerInterval" : 1000000, "processingTasksGenerator" : 1, "processingTasksProcessor" : 1, "processingBatchSize" : 1 }, "wordCountReducer" : { "libraryPath" : "libStreamMine3G-JavaOperatorWrapper.so", "parameters" : "-Djava.library.path=/usr/lib:/usr/local/lib -Djava.class.path=../lib/streammine3G-interface-1.5.1.jar -Dstreammine3G.jars=WordCountExample.md.jar:../lib/log4j-1.2.17.jar:../lib/commons-lang-2.1.jar -Dstreammine3G.class=wordcount.WordCountReducer -Dlog4j.configuration=file:./log4j.properties", "processingTasksProcessor" : 1, "processingBatchSize" : 1 } }, "topologies" : { "myTopology" : { "wordCountMapper" : ["wordCountReducer"] } } } |
Running The Example¶
- Download & install zookeeper
- Download & install StreamMine3G
- Download & install StreamMine3G's SimpleStaticManager
- Download & install StreamMine3G's JavaWrapper
- Download the example source code from here.
- Run the example:
tar -xf streammine3g-wordcount-example-java.tgz cd StreamMine3G-WordCount-Example-Java ant cd run ./start.sh