aboutsummaryrefslogtreecommitdiff
path: root/posts/storm.org
diff options
context:
space:
mode:
Diffstat (limited to 'posts/storm.org')
-rw-r--r--posts/storm.org615
1 files changed, 615 insertions, 0 deletions
diff --git a/posts/storm.org b/posts/storm.org
new file mode 100644
index 0000000..629f5a3
--- /dev/null
+++ b/posts/storm.org
@@ -0,0 +1,615 @@
+#+TITLE: Real-Time Streaming with Apache Storm and Apache Kafka
+#+DESCRIPTION: Overview of Apache Storm and sample Twitter Sentiment Analysis
+#+TAGS: Apache Storm
+#+TAGS: Apache Kafka
+#+TAGS: Apache
+#+TAGS: Java
+#+TAGS: Sentiment Analysis
+#+TAGS: Real-time streaming
+#+TAGS: zData Inc.
+#+DATE: 2014-07-16
+#+SLUG: real-time-streaming-storm-and-kafka
+#+LINK: kafka http://kafka.apache.org/
+#+LINK: kafka-benchmark http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
+#+LINK: storm https://storm.apache.org/
+#+LINK: storm-multi-lingual https://storm.apache.org/about/multi-language.html
+#+LINK: storm-integrates https://storm.apache.org/about/integrates.html
+#+LINK: storm-common-patterns https://storm.apache.org/releases/current/Common-patterns.html
+#+LINK: docker http://www.docker.io/
+#+LINK: supervisord http://supervisord.org/
+#+LINK: storm-kafka-spout-github https://github.com/apache/incubator-storm/tree/master/external/storm-kafka
+#+LINK: wiki-stemming http://en.wikipedia.org/wiki/Stemming
+#+LINK: wiki-bag-of-words http://en.wikipedia.org/wiki/Bag-of-words_model
+#+LINK: storm-docs-stream-grouping http://storm.incubator.apache.org/documentation/Concepts.html#stream-groupings
+#+LINK: jackson-databing https://github.com/FasterXML/jackson-databind
+#+LINK: storm-trident-overview http://storm.incubator.apache.org/documentation/Trident-API-Overview.html
+#+LINK: wiki-srp http://en.wikipedia.org/wiki/Single_responsibility_principle
+#+LINK: storm-sample-project https://github.com/zdata-inc/StormSampleProject
+#+LINK: storm-incubation-proposal https://wiki.apache.org/incubator/StormProposal
+
+#+BEGIN_PREVIEW
+The following post is one in the series of real-time systems tangential to the
+Hadoop ecosystem. First, exploring both Apache Storm and Apache Kafka as a
+part of a real-time processing engine. These two systems work together very
+well and make for an easy development experience while still being very
+performant.
+#+END_PREVIEW
+
+** About Kafka
+
+[[kafka][Apache Kafka]] is a message queue rethought as a distributed commit
+log. It follows the publish-subscribe messaging style, with speed and
+durability built in.
+
+Kafka uses Zookeeper to share and save state between brokers. Each broker
+maintains a set of partitions: primary and/ or secondary for each topic. A set
+of Kafka brokers working together will maintain a set of topics. Each topic
+has its partitions distributed over the participating Kafka brokers and, as of
+Kafka version 0.8, the replication factor determines, intuitively, the number
+of times a partition is duplicated for fault tolerance.
+
+While many brokered message queue systems have the broker maintain the state of
+its consumers, Kafka does not. This frees up resources for the broker to
+ingest data faster. For more information about Kafka's performance see
+[[kafka-benchmark][Benchmarking Kafka]].
+
+*** Initial Thoughts
+
+Kafka is a very promising project, with astounding throughput and one of the
+easiest pieces of software I have had the joy of installing and configuring.
+Although Kafka is not at the production 1.0 stable release yet, it's well on
+its way.
+
+** About Storm
+
+[[storm][Apache Storm]], currently in incubation, is a real-time computational
+engine made available under the free and open-source Apache version 2.0
+license. It runs continuously, consuming data from the configured sources
+(Spouts) and passes the data down the processing pipeline (Bolts). Combined,
+Spouts and Bolts make a Topology. A topology can be written in any language
+including any JVM based language, Python, Ruby, Perl, or, with some work, even
+C. See the [[storm][Storm]] [[storm-multi-lingual][multi-lingual]]
+documentation.
+
+*** Why Storm
+
+Quoting from the project site:
+
+#+BEGIN_QUOTE
+ Storm has many use cases: realtime analytics, online machine learning,
+ continuous computation, distributed RPC, ETL, and more. Storm is fast:
+ a benchmark clocked it at over a million tuples processed per second
+ per node. It is scalable, fault-tolerant, guarantees your data will be
+ processed, and is easy to set up and operate.
+ [[storm][Storm Homepage]]
+#+END_QUOTE
+
+*** Integration
+
+Storm can integrate with any queuing and any database system. In fact, there
+are already quite a few existing projects for use to integrate Storm with other
+projects, like [[storm-integrates][kestrel or Kafka]].
+
+*** Initial Thoughts
+
+I found Storm's verbiage around the computational pipeline to fit my mental
+model very well, thinking about streaming computational processes as directed
+acyclic graphs makes a lot of intuitive sense. That said, although I haven't
+been developing against Storm for very long, I do find some integration tasks
+to be slightly awkward. For example, writing an HDFS file writer bolt requires
+some special considerations given bolt life cycles and HDFS writing patterns.
+This is really only a minor blemish however, as it only means the developers of
+Storm topologies have to understand the API more intimately; there are already
+common patterns emerging that should be adaptable to about any
+[[storm-common-patterns][situation]].
+
+** Test Project: Twitter Stream Sentiment Analysis
+
+To really give Storm a try, something a little more involved than just a simple
+word counter is needed. Therefore, I have put together a Twitter Sentiment
+Analysis topology. Though this is a good representative example of a more
+complicated topology, the method used for actually scoring the Twitter data is
+overly simple.
+
+*** Setup
+
+The setup used for this demo is a 5 node Vagrant virtual cluster. Each node is
+running 64 bit CentOS 6.5, given 1 core, and 1024MB of RAM. Every node is
+running HDFS (datanode), Zookeeper, and Kafka. The first node, ~node0~, is the
+namenode, and Nimbus -- Storm's master daemon. ~node0~ is also running a
+[[docker][Docker]] container with a NodeJS application, part of the reporting
+process. The remaining nodes, ~node[1-4]~, are Storm worker nodes. Storm,
+Kafka, and Zookeeper are all run under supervision via
+[[supervisord][Supervisord]], so High-Availability is baked into this virtual
+cluster.
+
+*** Overview
+
+[[file:/media/SentimentAnalysisTopology.png]]
+
+I wrote a simple Kafka producer that reads files off disk and sends them to the
+Kafka cluster. This is how we feed the whole system and is used in lieu of
+opening a stream to Twitter.
+
+**** Spout
+
+The orange node from the picture is our
+[[storm-kafka-spout-github][~KafkaSpout~]] that will be consuming incoming
+messages from the Kafka brokers.
+
+**** Twitter Data JSON Parsing
+
+The first bolt, ~2~ in the image, attempts to parse the Twitter JSON data and
+emits =tweet_id= and =tweet_text=. This implementation only processes English
+tweets. If the topology were to be more ambitious, it may pass the language
+code down and create different scoring bolts for each language.
+
+**** Filtering and Stemming
+
+This next bolt, ~3~, performs first-round data sanitization. That is, it
+removes all non-alpha characters.
+
+Following, the next round of data cleansing, ~4~, is to remove common words to
+reduce noise for the classifiers. These common words are usually referred to
+as /stop words/. [[wiki-stemming][/Stemming/]], or considering suffixes to
+match the root, could also be performed here, or in another, later bolt.
+
+~4~, when finished, will then broadcast the data to both of the classifiers.
+
+**** Classifiers
+
+Each classifier is defined by its own bolt. One classifier for the positive
+class, ~5~, and another for the negative class, ~6~.
+
+The implementation of the classifiers follows the
+[[wiki-bag-of-words][Bag-of-words]] model.
+
+**** Join and Scoring
+
+Next, bolt ~7~ joins the scores from the two previous classifiers. The
+implementation of this bolt isn't perfect. For example, message transaction is
+not guaranteed: if one-side of the join fails, neither side is notified.
+
+To finish up the scoring, bolt ~8~ compares the scores from the classifiers and
+declares the class accordingly.
+
+**** Reporting: HDFS and HTTP POST
+
+Finally, the scoring bolt broadcasts off the results to a HDFS file writer
+bolt, ~9~, and a NodeJS notifier bolt, ~10~. The HDFS bolt fills a list until
+it has 1000 records in it and then spools to disk. Of course, like the join
+bolt, this could be better implemented to fail input tuples if the bolt fails
+to write to disk or have a timeout to write to disk after a certain period of
+time. The NodeJs notifier bolt, on the other hand, sends a HTTP POST each time
+a record is received. This could be batched as well, but again, this is for
+demonstration purposes only.
+
+*** Implementing the Kafka Producer
+
+Here, the main portions of the code for the Kafka producer that was used to
+test our cluster are defined.
+
+In the main class, we setup the data pipes and threads:
+
+#+BEGIN_SRC java
+ LOGGER.debug("Setting up streams");
+ PipedInputStream send = new PipedInputStream(BUFFER_LEN);
+ PipedOutputStream input = new PipedOutputStream(send);
+
+ LOGGER.debug("Setting up connections");
+ LOGGER.debug("Setting up file reader");
+ BufferedFileReader reader = new BufferedFileReader(filename, input);
+ LOGGER.debug("Setting up kafka producer");
+ KafkaProducer kafkaProducer = new KafkaProducer(topic, send);
+
+ LOGGER.debug("Spinning up threads");
+ Thread source = new Thread(reader);
+ Thread kafka = new Thread(kafkaProducer);
+
+ source.start();
+ kafka.start();
+
+ LOGGER.debug("Joining");
+ kafka.join();
+#+END_SRC
+
+The ~BufferedFileReader~ in its own thread reads off the data from disk:
+
+#+BEGIN_SRC java
+ rd = new BufferedReader(new FileReader(this.fileToRead));
+ wd = new BufferedWriter(new OutputStreamWriter(this.outputStream, ENC));
+ int b = -1;
+ while ((b = rd.read()) != -1)
+ {
+ wd.write(b);
+ }
+#+END_SRC
+
+Finally, the ~KafkaProducer~ sends asynchronous messages to the Kafka Cluster:
+
+#+BEGIN_SRC java
+ rd = new BufferedReader(new InputStreamReader(this.inputStream, ENC));
+ String line = null;
+ producer = new Producer<Integer, String>(conf);
+ while ((line = rd.readLine()) != null)
+ {
+ producer.send(new KeyedMessage<Integer, String>(this.topic, line));
+ }
+#+END_SRC
+
+Doing these operations on separate threads gives us the benefit of having disk
+reads not block the Kafka producer or vice-versa, enabling maximum performance
+tunable by the size of the buffer.
+
+*** Implementing the Storm Topology
+
+**** Topology Definition
+ :PROPERTIES:
+ :CUSTOM_ID: topology-definition
+ :END:
+
+Moving onward to Storm, here we define the topology and how each bolt
+will be talking to each other:
+
+#+BEGIN_SRC java
+ TopologyBuilder topology = new TopologyBuilder();
+
+ topology.setSpout("kafka_spout", new KafkaSpout(kafkaConf), 4);
+
+ topology.setBolt("twitter_filter", new TwitterFilterBolt(), 4)
+ .shuffleGrouping("kafka_spout");
+
+ topology.setBolt("text_filter", new TextFilterBolt(), 4)
+ .shuffleGrouping("twitter_filter");
+
+ topology.setBolt("stemming", new StemmingBolt(), 4)
+ .shuffleGrouping("text_filter");
+
+ topology.setBolt("positive", new PositiveSentimentBolt(), 4)
+ .shuffleGrouping("stemming");
+ topology.setBolt("negative", new NegativeSentimentBolt(), 4)
+ .shuffleGrouping("stemming");
+
+ topology.setBolt("join", new JoinSentimentsBolt(), 4)
+ .fieldsGrouping("positive", new Fields("tweet_id"))
+ .fieldsGrouping("negative", new Fields("tweet_id"));
+
+ topology.setBolt("score", new SentimentScoringBolt(), 4)
+ .shuffleGrouping("join");
+
+ topology.setBolt("hdfs", new HDFSBolt(), 4)
+ .shuffleGrouping("score");
+ topology.setBolt("nodejs", new NodeNotifierBolt(), 4)
+ .shuffleGrouping("score");
+#+END_SRC
+
+Notably, the data is shuffled to each bolt until except when joining, as it's
+very important that the same tweets are given to the same instance of the
+joining bolt. To read more about stream groupings, visit the
+[[storm-docs-stream-grouping][Storm documentation]].
+
+**** Twitter Data Filter / Parser
+
+Parsing the Twitter JSON data is one of the first things that needs to be done.
+This is accomplished with the use of the [[jackson-databind][JacksonXML
+Databind]] library.
+
+#+BEGIN_SRC java
+ JsonNode root = mapper.readValue(json, JsonNode.class);
+ long id;
+ String text;
+ if (root.get("lang") != null &&
+ "en".equals(root.get("lang").textValue()))
+ {
+ if (root.get("id") != null && root.get("text") != null)
+ {
+ id = root.get("id").longValue();
+ text = root.get("text").textValue();
+ collector.emit(new Values(id, text));
+ }
+ else
+ LOGGER.debug("tweet id and/ or text was null");
+ }
+ else
+ LOGGER.debug("Ignoring non-english tweet");
+#+END_SRC
+
+As mentioned before, ~tweet_id~ and ~tweet_text~ are the only values being
+emitted.
+
+This is just using the basic ~ObjectMapper~ class from the Jackson Databind
+library to map the simple JSON object provided by the Twitter Streaming API to
+a ~JsonNode~. The language code is first tested to be English, as the topology
+does not support non-English tweets. The new tuple is emitted down the Storm
+pipeline after ensuring the existence of the desired data, namely, ~tweet_id~,
+and ~tweet_text~.
+
+**** Text Filtering
+
+As previously mentioned, punctuation and other symbols are removed because they
+are just noise to the classifiers:
+
+#+BEGIN_SRC java
+ Long id = input.getLong(input.fieldIndex("tweet_id"));
+ String text = input.getString(input.fieldIndex("tweet_text"));
+ text = text.replaceAll("[^a-zA-Z\\s]", "").trim().toLowerCase();
+ collector.emit(new Values(id, text));
+#+END_SRC
+
+/Very/ common words are also removed because they are similarly noisy to the
+classifiers:
+
+#+BEGIN_SRC java
+ Long id = input.getLong(input.fieldIndex("tweet_id"));
+ String text = input.getString(input.fieldIndex("tweet_text"));
+ List<String> stopWords = StopWords.getWords();
+ for (String word : stopWords)
+ {
+ text = text.replaceAll(word, "");
+ }
+ collector.emit(new Values(id, text));
+#+END_SRC
+
+Here the ~StopWords~ class is a singleton holding the list of words we wish to
+omit.
+
+**** Positive and Negative Scoring
+
+Since the approach for scoring is based on a very limited
+[[wiki-bag-of-words][Bag-of-words]] model, Each class is put into its own bolt;
+this also contrives the need for a join later.
+
+Positive Scoring:
+
+#+BEGIN_SRC java
+ Long id = input.getLong(input.fieldIndex("tweet_id"));
+ String text = input.getString(input.fieldIndex("tweet_text"));
+ Set<String> posWords = PositiveWords.getWords();
+ String[] words = text.split(" ");
+ int numWords = words.length;
+ int numPosWords = 0;
+ for (String word : words)
+ {
+ if (posWords.contains(word))
+ numPosWords++;
+ }
+ collector.emit(new Values(id, (float) numPosWords / numWords, text));
+#+END_SRC
+
+Negative Scoring:
+
+#+BEGIN_SRC java
+ Long id = input.getLong(input.fieldIndex("tweet_id"));
+ String text = input.getString(input.fieldIndex("tweet_text"));
+ Set<String> negWords = NegativeWords.getWords();
+ String[] words = text.split(" ");
+ int numWords = words.length;
+ int numNegWords = 0;
+ for (String word : words)
+ {
+ if (negWords.contains(word))
+ numNegWords++;
+ }
+ collector.emit(new Values(id, (float)numNegWords / numWords, text));
+#+END_SRC
+
+Similar to ~StopWords~, ~PositiveWords~ and ~NegativeWords~ are both singletons
+maintaining lists of positive and negative words, respectively.
+
+**** Joining Scores
+
+Joining in Storm isn't the most straight forward process to implement, although
+the process may be made simpler with the use of the
+[[storm-trident-overview][Trident API]]. However, to get a feel for what to do
+without Trident and as an Academic exercise, the join is not implemented with
+the Trident API. On related note, this join isn't perfect! It ignores a couple
+of issues, namely, it does not correctly fail a tuple on a one-sided join (when
+tweets are received twice from the same scoring bolt) and doesn't timeout
+tweets if they are left in the queue for too long. With this in mind, here is
+our join:
+
+#+BEGIN_SRC java
+ Long id = input.getLong(input.fieldIndex("tweet_id"));
+ String text = input.getString(input.fieldIndex("tweet_text"));
+ if (input.contains("pos_score"))
+ {
+ Float pos = input.getFloat(input.fieldIndex("pos_score"));
+ if (this.tweets.containsKey(id))
+ {
+ Triple<String, Float, String> triple = this.tweets.get(id);
+ if ("neg".equals(triple.getCar()))
+ emit(collector, id, triple.getCaar(), pos, triple.getCdr());
+ else
+ {
+ LOGGER.warn("one sided join attempted");
+ this.tweets.remove(id);
+ }
+ }
+ else
+ this.tweets.put(
+ id,
+ new Triple<String, Float, String>("pos", pos, text));
+ }
+ else if (input.contains("neg_score"))
+ {
+ Float neg = input.getFloat(input.fieldIndex("neg_score"));
+ if (this.tweets.containsKey(id))
+ {
+ Triple<String, Float, String> triple = this.tweets.get(id);
+ if ("pos".equals(triple.getCar()))
+ emit(collector, id, triple.getCaar(), neg, triple.getCdr());
+ else
+ {
+ LOGGER.warn("one sided join attempted");
+ this.tweets.remove(id);
+ }
+ }
+ else
+ this.tweets.put(
+ id,
+ new Triple<String, Float, String>("neg", neg, text));
+ }
+#+END_SRC
+
+Where ~emit~ is defined simply by:
+
+#+BEGIN_SRC java
+ private void emit(
+ BasicOutputCollector collector,
+ Long id,
+ String text,
+ float pos,
+ float neg)
+ {
+ collector.emit(new Values(id, pos, neg, text));
+ this.tweets.remove(id);
+ }
+#+END_SRC
+
+**** Deciding the Winning Class
+
+To ensure the [[wiki-srp][Single responsibility principle]] is not violated,
+joining and scoring are split into separate bolts, though the class of the
+tweet could certainly be decided at the time of joining.
+
+#+BEGIN_SRC java
+ Long id = tuple.getLong(tuple.fieldIndex("tweet_id"));
+ String text = tuple.getString(tuple.fieldIndex("tweet_text"));
+ Float pos = tuple.getFloat(tuple.fieldIndex("pos_score"));
+ Float neg = tuple.getFloat(tuple.fieldIndex("neg_score"));
+ String score = pos > neg ? "positive" : "negative";
+ collector.emit(new Values(id, text, pos, neg, score));
+#+END_SRC
+
+This decider will prefer negative scores, so if there is a tie, it's
+automatically handed to the negative class.
+
+**** Report the Results
+
+Finally, there are two bolts that will yield results: one that writes data to
+HDFS, and another to send the data to a web server.
+
+#+BEGIN_SRC java
+ Long id = input.getLong(input.fieldIndex("tweet_id"));
+ String tweet = input.getString(input.fieldIndex("tweet_text"));
+ Float pos = input.getFloat(input.fieldIndex("pos_score"));
+ Float neg = input.getFloat(input.fieldIndex("neg_score"));
+ String score = input.getString(input.fieldIndex("score"));
+ String tweet_score =
+ String.format("%s,%s,%f,%f,%s\n", id, tweet, pos, neg, score);
+ this.tweet_scores.add(tweet_score);
+ if (this.tweet_scores.size() >= 1000)
+ {
+ writeToHDFS();
+ this.tweet_scores = new ArrayList<String>(1000);
+ }
+#+END_SRC
+
+Where ~writeToHDFS~ is primarily given by:
+
+#+BEGIN_SRC java
+ Configuration conf = new Configuration();
+ conf.addResource(new Path("/opt/hadoop/etc/hadoop/core-site.xml"));
+ conf.addResource(new Path("/opt/hadoop/etc/hadoop/hdfs-site.xml"));
+ hdfs = FileSystem.get(conf);
+ file = new Path(
+ Properties.getString("rts.storm.hdfs_output_file") + this.id);
+ if (hdfs.exists(file))
+ os = hdfs.append(file);
+ else
+ os = hdfs.create(file);
+ wd = new BufferedWriter(new OutputStreamWriter(os, "UTF-8"));
+ for (String tweet_score : tweet_scores)
+ {
+ wd.write(tweet_score);
+ }
+#+END_SRC
+
+And our ~HTTP POST~ to a web server:
+
+#+BEGIN_SRC java
+ Long id = input.getLong(input.fieldIndex("tweet_id"));
+ String tweet = input.getString(input.fieldIndex("tweet_text"));
+ Float pos = input.getFloat(input.fieldIndex("pos_score"));
+ Float neg = input.getFloat(input.fieldIndex("neg_score"));
+ String score = input.getString(input.fieldIndex("score"));
+ HttpPost post = new HttpPost(this.webserver);
+ String content = String.format(
+ "{\"id\": \"%d\", " +
+ "\"text\": \"%s\", " +
+ "\"pos\": \"%f\", " +
+ "\"neg\": \"%f\", " +
+ "\"score\": \"%s\" }",
+ id, tweet, pos, neg, score);
+
+ try
+ {
+ post.setEntity(new StringEntity(content));
+ HttpResponse response = client.execute(post);
+ org.apache.http.util.EntityUtils.consume(response.getEntity());
+ }
+ catch (Exception ex)
+ {
+ LOGGER.error("exception thrown while attempting post", ex);
+ LOGGER.trace(null, ex);
+ reconnect();
+ }
+#+END_SRC
+
+There are some faults to point out with both of these bolts. Namely, the HDFS
+bolt tries to batch the writes into 1000 tweets, but does not keep track of the
+tuples nor does it time out tuples or flush them at some interval. That is, if
+a write fails or if the queue sits idle for too long, the topology is not
+notified and can't resend the tuples. Similarly, the ~HTTP POST~, does not
+batch and sends each POST synchronously causing the bolt to block for each
+message. This can be alleviated with more instances of this bolt and more web
+servers to handle the increase in posts, and/ or a better batching process.
+
+** Summary
+
+The full source of this test application can be found on
+[[storm-sample-project][Github]].
+
+Apache Storm and Apache Kafka both have great potential in the real-time
+streaming market and have so far proven themselves to be very capable systems
+for performing real-time analytics.
+
+Stay tuned, as the next post in this series will be an overview look at
+Streaming with Apache Spark.
+
+** Related Links / References
+
+- [[storm][Apache Storm Project Page]]
+
+- [[storm-multi-lingual][Storm Multi-Language Documentation]]
+
+- [[kafka][Apache Kafka Project Page]]
+
+- [[kafka-benchmark][LinkedIn Kafka Benchmarking: 2 million writes per
+ second]]
+
+- [[storm-integrates][Storm Integration Documentation]]
+
+- [[supervisord][Supervisord Project Page]]
+
+- [[docker][Docker IO Project Page]]
+
+- [[storm-kafka-extern-github][Storm-Kafka Source]]
+
+- [[storm-sample-project][Full Source of Test Project]]
+
+- [[storm-incubation-proposal][Apache Storm Incubation Proposal]]
+
+- [[jackson-databind][Jackson Databind Project Bag]]
+
+- [[wiki-bag-of-words][Wikipedia: Bag of words]]
+
+- [[storm-trident-overview][Storm Trident API Overview]]
+
+- [[wiki-srp][Wikipedia: Single responsibility principle]]
+
+- [[wiki-stemming][Wikipedia: Stemming]]
+
+- [[storm-common-patterns][Storm Documentation: Common Patterns]]
+
+- [[storm-docs-stream-grouping][Stream Groupings]]