diff options
author | Kenny Ballou <kballou@devnulllabs.io> | 2018-01-28 12:34:08 -0700 |
---|---|---|
committer | Kenny Ballou <kballou@devnulllabs.io> | 2018-08-19 08:13:27 -0600 |
commit | 1379fcdbced6666a81946baf48bfcccd7c7fbc5c (patch) | |
tree | 713c3a8f2317faf1c0541b3e51d8235fd9e7ca64 | |
parent | d0b8889c59c785f3c98787d6a47a4ac8b55cef7d (diff) | |
download | blog.kennyballou.com-1379fcdbced6666a81946baf48bfcccd7c7fbc5c.tar.gz blog.kennyballou.com-1379fcdbced6666a81946baf48bfcccd7c7fbc5c.tar.xz |
spark streaming post conversion
-rw-r--r-- | content/blog/Spark.markdown | 738 | ||||
-rw-r--r-- | posts/spark.org | 783 |
2 files changed, 783 insertions, 738 deletions
diff --git a/content/blog/Spark.markdown b/content/blog/Spark.markdown deleted file mode 100644 index e3d7b6f..0000000 --- a/content/blog/Spark.markdown +++ /dev/null @@ -1,738 +0,0 @@ ---- -title: "Real-Time Streaming with Apache Spark Streaming" -description: "Overview of Apache Spark and a sample Twitter Sentiment Analysis" -tags: - - "Apache Spark" - - "Apache Kafka" - - "Apache" - - "Java" - - "Sentiment Analysis" - - "Real-time Streaming" - - "ZData Inc." -date: "2014-08-18" -catagories: - - "Apache" - - "Development" - - "Real-time Systems" -slug: "real-time-streaming-apache-spark-streaming" ---- - -This is the second post in a series on real-time systems tangential to the -Hadoop ecosystem. [Last time][6], we talked about [Apache Kafka][5] and Apache -Storm for use in a real-time processing engine. Today, we will be exploring -Apache Spark (Streaming) as part of a real-time processing engine. - -## About Spark ## - -[Apache Spark][1] is a general purpose, large-scale processing engine, recently -fully inducted as an Apache project and is currently under very active -development. As of this writing, Spark is at version 1.0.2 and 1.1 will be -released some time soon. - -Spark is intended to be a drop in replacement for Hadoop MapReduce providing -the benefit of improved performance. Combining Spark with its related projects -and libraries -- [Spark SQL (formerly Shark)][14], [Spark Streaming][15], -[Spark MLlib][16], [GraphX][17], among others -- and a very capable and -promising processing stack emerges. Spark is capable of reading from HBase, -Hive, Cassandra, and any HDFS data source. Not to mention the many external -libraries that enable consuming data from many more sources, e.g., hooking -Apache Kafka into Spark Streaming is trivial. Further, the Spark Streaming -project provides the ability to continuously compute transformations on data. - -### Resilient Distributed Datasets ### - -Apache Spark's primitive type is the Resilient Distributed Dataset (RDD). All -transformations, `map`, `join`, `reduce`, etc., in Spark revolve around this -type. RDD's can be created in one of three ways: _parallelizing_ (distributing -a local dataset); reading a stable, external data source, such as an HDFS file; -or transformations on existing RDD's. - -In Java, parallelizing may look like: - - List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); - JavaRDD<Integer> distData = sc.parallelize(data); - -Where `sc` defines the Spark context. - -Similarly, reading a file from HDFS may look like: - - JavaRDD<String> distFile = sc.textFile("hdfs:///data.txt"); - -The resiliency of RDD's comes from their [lazy][34] materialization and the -information required to enable this lazy nature. RDD's are not always fully -materialized but they _do_ contain enough information (their linage) to be -(re)created from a stable source [[Zaharia et al.][32]]. - -RDD's are distributed among the participating machines, and RDD transformations -are coarse-grained -- the same transformation will be applied to _every_ -element in an RDD. The number of partitions in an RDD is generally defined by -the locality of the stable source, however, the user may control this number -via repartitioning. - -Another important property to mention, RDD's are actually immutable. This -immutability can be illustrated with [Spark's][27] Word Count example: - - JavaRDD<String> file = sc.textFile("hdfs:///data.txt"); - JavaRDD<String> words = file.flatMap( - new FlatMapFunction<String, String>() { - public Iterable<String> call(String line) { - return Arrays.asList(line.split(" ")); - } - } - ); - JavaPairRDD<String, Integer> pairs = words.map( - new PairFunction<String, String, Integer>() { - public Tuple2<String, Integer> call(String word) { - return new Tuple2<String, Integer>(word, 1); - } - } - ); - JavaPairRDD<String, Integer> counts = pairs.reduceByKey( - new Function2<Integer, Integer>() { - public Integer call(Integer a, Integer b) { return a + b; } - } - ); - counts.saveAsTextFile("hdfs:///data_counted.txt"); - -This is the canonical word count example, but here is a brief explanation: load -a file into an RDD, split the words into a new RDD, map the words into pairs -where each word is given a count (one), then reduce the counts of each word by -a key, in this case the word itself. Notice, each operation, `map`, `flatMap`, -`reduceByKey`, creates a _new_ RDD. - -To bring all these properties together, Resilient Distributed Datasets are -read-only, lazy distributed sets of elements that can have a chain of -transformations applied to them. They facilitate resiliency by storing lineage -graphs of the transformations (to be) applied and they [parallelize][35] the -computations by partitioning the data among the participating machines. - -### Discretized Streams ### - -Moving to Spark Streaming, the primitive is still RDD's. However, there is -another type for encapsulating a continuous stream of data: Discretized Streams -or DStreams. DStreams are defined as sequences of RDD's. A DStream is created -from an input source, such as Apache Kafka, or from the transformation of -another DStream. - -Turns out, programming against DStreams is _very_ similar to programming -against RDD's. The same word count code can be slightly modified to create a -streaming word counter: - - JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999); - JavaDStream<String> words = lines.flatMap( - new FlatMapFunction<String, String>() { - public Iterable<String> call(String line) { - return Arrays.asList(line.split(" ")); - } - } - ); - JavaPairDStream<String, Integer> pairs = words.map( - new PairFunction<String, String, Integer>() { - public Tuple2<String, Integer> call(String word) { - return new Tuple2<String, Integer>(word, 1); - } - } - ); - JavaPairDStream<String, Integer> counts = pairs.reduceByKey( - new Function2<Integer, Integer>() { - public Integer call(Integer a, Integer b) { return a + b; } - } - ); - counts.print(); - -Notice, really the only change between first example's code is the return -types. In the streaming context, transformations are working on streams of -RDD's, Spark handles applying the functions (that work against data in the -RDD's) to the RDD's in the current batch/ DStream. - -Though programming against DStreams is similar, there are indeed some -differences as well. Chiefly, DStreams also have _statefull_ transformations. -These include sharing state between batches/ intervals and modifying the -current frame when aggregating over a sliding window. - ->The key idea is to treat streaming as a series of short batch jobs, and bring ->down the latency of these jobs as much as possible. This brings many of the ->benefits of batch processing models to stream processing, including clear ->consistency semantics and a new parallel recovery technique... -[[Zaharia et al.][33]] - -### Hadoop Requirements ### - -Technically speaking, Apache Spark does [_not_][30] require Hadoop to be fully -functional. In a cluster setting, however, a means of sharing files between -tasks will need to be facilitated. This could be accomplished through [S3][8], -[NFS][9], or, more typically, HDFS. - -### Running Spark Applications ### - -Apache Spark applications can run in [standalone mode][18] or be managed by -[YARN][10]([Running Spark on YARN][19]), [Mesos][11]([Running Spark on -Mesos][20]), and even [EC2][28]([Running Spark on EC2][29]). Furthermore, if -running under YARN or Mesos, Spark does not need to be installed to work. That -is, Spark code can execute on YARN and Mesos clusters without change to the -cluster. - -### Language Support ### - -Currently, Apache Spark supports the Scala, Java, and Python programming -languages. Though, this post will only be discussing examples in Java. - -### Initial Thoughts ### - -Getting away from the idea of directed acyclic graphs (DAG's) is -- may be -- -both a bit of a leap and a benefit. Although it is perfectly acceptable to -define Spark's transformations altogether as a DAG, this can feel awkward when -developing Spark applications. Describing the transformations as [Monadic][13] -feels much more natural. Of course, a monad structure fits the DAG analogy -quite well, especially when considered in some of the physical analogies such -as assembly lines. - -Java's, and consequently Spark's, type strictness was an initial hurdle to -get accustomed. But overall, this is good. It means the compiler will catch a -lot of issues with transformations early. - -Depending on Scala's `Tuple[\d]` classes feels second-class, but this is only a -minor tedium. It's too bad current versions of Java don't have good classes for -this common structure. - -YARN and Mesos integration is a very nice benefit as it allows full stack -analytics to not oversubscribe clusters. Furthermore, it gives the ability to -add to existing infrastructure without overloading the developers and the -system administrators with _yet another_ computational suite and/ or resource -manager. - -On the negative side of things, dependency hell can creep into Spark projects. -Your project and Spark (and possibly Spark's dependencies) may depend on a -common artifact. If the versions don't [converge][21], many subtle problems can -emerge. There is an [experimental configuration option][4] to help alleviate -this problem, however, for me, it caused more problems than solved. - -## Test Project: Twitter Stream Sentiment Analysis ## - -To really test Spark (Streaming), a Twitter Sentiment Analysis project was -developed. It's almost a direct port of the [Storm code][3]. Though there is an -external library for hooking Spark directly into Twitter, Kafka is used so a -more precise comparison of Spark and Storm can be made. - -When the processing is finished, the data are written to HDFS and posted to a -simple NodeJS application. - -### Setup ### - -The setup is the same as [last time][6]: 5 node Vagrant virtual cluster with -each node running 64 bit CentOS 6.5, given 1 core, and 1024MB of RAM. Every -node is running HDFS (datanode), YARN worker nodes (nodemanager), ZooKeeper, -and Kafka. The first node, `node0`, is the namenode and resource manager. -`node0` is also running a [Docker][7] container with a NodeJS application for -reporting purposes. - -### Application Overview ### - -This project follows a very similar process structure as the Storm Topology -from last time. - -{{< figure src="/media/SentimentAnalysisTopology.png" - alt="Sentiment Analysis Topology" >}} - -However, each node in the above graph is actually a transformation on the -current DStream and not an individual process (or group of processes). - -This test project similarly uses the same [simple Kafka producer][22] -developed. This Kafka producer will be how data are ingested by the system. - -[A lot of this overview will be a rehashing of last time.] - -#### Kafka Receiver Stream #### - -The data processed is received from a Kafka Stream and is implemented via the -[external Kafka][23] library. This process simply creates a connection to the -Kafka broker(s), consuming messages from the given set of topics. - -##### Stripping Kafka Message IDs ##### - -It turns out the messages from Kafka are retuned as tuples, more specifically -pairs, with the message ID and the message content. Before continuing, the -message ID is stripped and the Twitter JSON data is passed down the pipeline. - -#### Twitter Data JSON Parsing #### - -As was the case last time, the important parts (tweet ID, tweet text, and -language code) need to be extracted from the JSON. Furthermore, this project -only parses English tweets. Non-English tweets are filtered out at this stage. - -#### Filtering and Stemming #### - -Many tweets contain messy or otherwise unnecessary characters and punctuation -that can be safely ignored. Moreover, there may also be many common words that -cannot be reliably scored either positively or negatively. At this stage, these -symbols and _stop words_ should be filtered. - -#### Classifiers #### - -Both the Positive classifier and the Negative classifier are in separate `map` -transformations. The implementation of both follows the [Bag-of-words][25] -model. - -#### Joining and Scoring #### - -Because the classifiers are done separately and a join is contrived, the next -step is to join the classifier scores together and actually declare a winner. -It turns out this is quite trivial to do in Spark. - -#### Reporting: HDFS and HTTP POST #### - -Finally, once the tweets are joined and scored, the scores need to be reported. -This is accomplished by writing the final tuples to HDFS and posting a JSON -object of the tuple to a simple NodeJS application. - -This process turned out to not be as awkward as was the case with Storm. The -`foreachRDD` function of DStreams is a natural way to do side-effect inducing -operations that don't necessarily transform the data. - -### Implementing the Kafka Producer ### - -See the [post][6] from last time for the details of the Kafka producer; this -has not changed. - -### Implementing the Spark Streaming Application ### - -Diving into the code, here are some of the primary aspects of this project. The -full source of this test application can be found on [Github][24]. - -#### Creating Spark Context, Wiring Transformation Chain #### - -The Spark context, the data source, and the transformations need to be defined. -Proceeding, the context needs to be started. This is all accomplished with the -following code: - - SparkConf conf = new SparkConf() - .setAppName("Twitter Sentiment Analysis"); - - if (args.length > 0) - conf.setMaster(args[0]); - else - conf.setMaster("local[2]"); - - JavaStreamingContext ssc = new JavaStreamingContext( - conf, - new Duration(2000)); - - Map<String, Integer> topicMap = new HashMap<String, Integer>(); - topicMap.put(KAFKA_TOPIC, KAFKA_PARALLELIZATION); - - JavaPairReceiverInputDStream<String, String> messages = - KafkaUtils.createStream( - ssc, - Properties.getString("rts.spark.zkhosts"), - "twitter.sentimentanalysis.kafka", - topicMap); - - JavaDStream<String> json = messages.map( - new Function<Tuple2<String, String>, String>() { - public String call(Tuple2<String, String> message) { - return message._2(); - } - } - ); - - JavaPairDStream<Long, String> tweets = json.mapToPair( - new TwitterFilterFunction()); - - JavaPairDStream<Long, String> filtered = tweets.filter( - new Function<Tuple2<Long, String>, Boolean>() { - public Boolean call(Tuple2<Long, String> tweet) { - return tweet != null; - } - } - ); - - JavaDStream<Tuple2<Long, String>> tweetsFiltered = filtered.map( - new TextFilterFunction()); - - tweetsFiltered = tweetsFiltered.map( - new StemmingFunction()); - - JavaPairDStream<Tuple2<Long, String>, Float> positiveTweets = - tweetsFiltered.mapToPair(new PositiveScoreFunction()); - - JavaPairDStream<Tuple2<Long, String>, Float> negativeTweets = - tweetsFiltered.mapToPair(new NegativeScoreFunction()); - - JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined = - positiveTweets.join(negativeTweets); - - JavaDStream<Tuple4<Long, String, Float, Float>> scoredTweets = - joined.map(new Function<Tuple2<Tuple2<Long, String>, - Tuple2<Float, Float>>, - Tuple4<Long, String, Float, Float>>() { - public Tuple4<Long, String, Float, Float> call( - Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet) - { - return new Tuple4<Long, String, Float, Float>( - tweet._1()._1(), - tweet._1()._2(), - tweet._2()._1(), - tweet._2()._2()); - } - }); - - JavaDStream<Tuple5<Long, String, Float, Float, String>> result = - scoredTweets.map(new ScoreTweetsFunction()); - - result.foreachRDD(new FileWriter()); - result.foreachRDD(new HTTPNotifierFunction()); - - ssc.start(); - ssc.awaitTermination(); - -Some of the more trivial transforms are defined in-line. The others are defined -in their respective files. - -#### Twitter Data Filter / Parser #### - -Parsing Twitter JSON data is one of the first transformations and is -accomplished with help of the [JacksonXML Databind][26] library. - - JsonNode root = mapper.readValue(tweet, JsonNode.class); - long id; - String text; - if (root.get("lang") != null && - "en".equals(root.get("lang").textValue())) - { - if (root.get("id") != null && root.get("text") != null) - { - id = root.get("id").longValue(); - text = root.get("text").textValue(); - return new Tuple2<Long, String>(id, text); - } - return null; - } - return null; - -The `mapper` (`ObjectMapper`) object is defined at the class level so it is not -recreated _for each_ RDD in the DStream, a minor optimization. - -You may recall, this is essentially the same code as [last time][6]. The only -difference really is that the tuple is returned instead of being emitted. -Because certain situations (e.g., non-English tweet, malformed tweet) return -null, the nulls will need to be filtered out. Thankfully, Spark provides a -simple way to accomplish this: - - JavaPairDStream<Long, String> filtered = tweets.filter( - new Function<Tuple2<Long, String>, Boolean>() { - public Boolean call(Tuple2<Long, String> tweet) { - return tweet != null; - } - } - ); - -#### Text Filtering #### - -As mentioned before, punctuation and other symbols are simply discarded as they -provide little to no benefit to the classifiers: - - String text = tweet._2(); - text = text.replaceAll("[^a-zA-Z\\s]", "").trim().toLowerCase(); - return new Tuple2<Long, String>(tweet._1(), text); - -Similarly, common words should be discarded as well: - - String text = tweet._2(); - List<String> stopWords = StopWords.getWords(); - for (String word : stopWords) - { - text = text.replaceAll("\\b" + word + "\\b", ""); - } - return new Tuple2<Long, String>(tweet._1(), text); - -#### Positive and Negative Scoring #### - -Each classifier is defined in its own class. Both classifiers are _very_ -similar in definition. - -The positive classifier is primarily defined by: - - String text = tweet._2(); - Set<String> posWords = PositiveWords.getWords(); - String[] words = text.split(" "); - int numWords = words.length; - int numPosWords = 0; - for (String word : words) - { - if (posWords.contains(word)) - numPosWords++; - } - return new Tuple2<Tuple2<Long, String>, Float>( - new Tuple2<Long, String>(tweet._1(), tweet._2()), - (float) numPosWords / numWords - ); - -And the negative classifier: - - String text = tweet._2(); - Set<String> negWords = NegativeWords.getWords(); - String[] words = text.split(" "); - int numWords = words.length; - int numPosWords = 0; - for (String word : words) - { - if (negWords.contains(word)) - numPosWords++; - } - return new Tuple2<Tuple2<Long, String>, Float>( - new Tuple2<Long, String>(tweet._1(), tweet._2()), - (float) numPosWords / numWords - ); - -Because both are implementing a `PairFunction`, a join situation is contrived. -However, this could _easily_ be defined differently such that one classifier is -computed, then the next, without ever needing to join the two together. - -#### Joining #### - -It turns out, joining in Spark is very easy to accomplish. So easy in fact, it -can be handled without virtually _any_ code: - - JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined = - positiveTweets.join(negativeTweets); - -But because working with a Tuple of nested tuples seems unwieldy, transform it -to a 4 element tuple: - - public Tuple4<Long, String, Float, Float> call( - Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet) - { - return new Tuple4<Long, String, Float, Float>( - tweet._1()._1(), - tweet._1()._2(), - tweet._2()._1(), - tweet._2()._2()); - } - -#### Scoring: Declaring Winning Class #### - -Declaring the winning class is a matter of a simple map, comparing each class's -score and take the greatest: - - String score; - if (tweet._3() >= tweet._4()) - score = "positive"; - else - score = "negative"; - return new Tuple5<Long, String, Float, Float, String>( - tweet._1(), - tweet._2(), - tweet._3(), - tweet._4(), - score); - -This declarer is more optimistic about the neutral case but is otherwise very -straightforward. - -#### Reporting the Results #### - -Finally, the pipeline completes with writing the results to HDFS: - - if (rdd.count() <= 0) return null; - String path = Properties.getString("rts.spark.hdfs_output_file") + - "_" + - time.milliseconds(); - rdd.saveAsTextFile(path); - -And sending POST request to a NodeJS application: - - rdd.foreach(new SendPostFunction()); - -Where `SendPostFunction` is primarily given by: - - String webserver = Properties.getString("rts.spark.webserv"); - HttpClient client = new DefaultHttpClient(); - HttpPost post = new HttpPost(webserver); - String content = String.format( - "{\"id\": \"%d\", " + - "\"text\": \"%s\", " + - "\"pos\": \"%f\", " + - "\"neg\": \"%f\", " + - "\"score\": \"%s\" }", - tweet._1(), - tweet._2(), - tweet._3(), - tweet._4(), - tweet._5()); - - try - { - post.setEntity(new StringEntity(content)); - HttpResponse response = client.execute(post); - org.apache.http.util.EntityUtils.consume(response.getEntity()); - } - catch (Exception ex) - { - Logger LOG = Logger.getLogger(this.getClass()); - LOG.error("exception thrown while attempting to post", ex); - LOG.trace(null, ex); - } - -Each file written to HDFS _will_ have data in it, but the data written will be -small. A better batching procedure should be implemented so the files written -match the HDFS block size. - -Similarly, a POST request is opened _for each_ scored tweet. This can be -expensive on both the Spark Streaming batch timings and the web server -receiving the requests. Batching here could similarly improve overall -performance of the system. - -That said, writing these side-effects this way fits very naturally into the -Spark programming style. - -## Summary ## - -Apache Spark, in combination with Apache Kafka, has some amazing potential. And -not only in the Streaming context, but as a drop-in replacement for -traditional Hadoop MapReduce. This combination makes it a very good candidate -for a part in an analytics engine. - -Stay tuned, as the next post will be a more in-depth comparison between Apache -Spark and Apache Storm. - -## Related Links / References ## - -[1]: http://spark.apache.org/ - -* [Apache Spark][1] - -[2]: http://inside-bigdata.com/2014/07/15/theres-spark-theres-fire-state-apache-spark-2014/ - -* [State of Apache Spark 2014][2] - -[3]: https://github.com/zdata-inc/StormSampleProject - -* [Storm Sample Project][3] - -[4]: https://issues.apache.org/jira/browse/SPARK-939 - -* [SPARK-939][4] - -[5]: http://kafka.apache.org - -* [Apache Spark][5] - -[6]: https://kennyballou.com/blog/2014/07/real-time-streaming-storm-and-kafka - -* [Real-Time Streaming with Apache Storm and Apache Kafka][6] - -[7]: http://www.docker.io/ - -* [Docker IO Project Page][7] - -[8]: http://aws.amazon.com/s3/ - -* [Amazon S3][8] - -[9]: http://en.wikipedia.org/wiki/Network_File_System - -* [Network File System (NFS)][9] - -[10]: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html - -* [Hadoop YARN][10] - -[11]: http://mesos.apache.org - -* [Apache Mesos][11] - -[12]: http://spark.apache.org/docs/latest/streaming-programming-guide.html - -* [Spark Streaming Programming Guide][12] - -[13]: http://en.wikipedia.org/wiki/Monad_(functional_programming) - -* [Monad][13] - -[14]: http://spark.apache.org/sql/ - -* [Spark SQL][14] - -[15]: http://spark.apache.org/streaming/ - -* [Spark Streaming][15] - -[16]: http://spark.apache.org/mllib/ - -* [MLlib][16] - -[17]: http://spark.apache.org/graphx/ - -* [GraphX][17] - -[18]: http://spark.apache.org/docs/latest/spark-standalone.html - -* [Spark Standalone Mode][18] - -[19]: http://spark.apache.org/docs/latest/running-on-yarn.html - -* [Running on YARN][19] - -[20]: http://spark.apache.org/docs/latest/running-on-mesos.html - -* [Running on Mesos][20] - -[21]: http://cupofjava.de/blog/2013/02/01/fight-dependency-hell-in-maven/ - -* [Fight Dependency Hell in Maven][21] - -[22]: https://github.com/zdata-inc/SimpleKafkaProducer - -* [Simple Kafka Producer][22] - -[23]: https://github.com/apache/spark/tree/master/external/kafka - -* [Spark: External Kafka Library][23] - -[24]: https://github.com/zdata-inc/SparkSampleProject - -* [Spark Sample Project][24] - -[25]: http://en.wikipedia.org/wiki/Bag-of-words_model - -* [Wikipedia: Bag-of-words][25] - -[26]: https://github.com/FasterXML/jackson-databind - -* [Jackson XML Databind Project][26] - -[27]: http://spark.apache.org/docs/latest/programming-guide.html - -* [Spark Programming Guide][27] - -[28]: http://aws.amazon.com/ec2/ - -* [Amazon EC2][28] - -[29]: http://spark.apache.org/docs/latest/ec2-scripts.html - -* [Running Spark on EC2][29] - -[30]: http://spark.apache.org/faq.html - -* [Spark FAQ][30] - -[31]: http://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html - -* [Future of Shark][31] - -[32]: https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf - -* [Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing (PDF)][32] - -[33]: https://www.usenix.org/system/files/conference/hotcloud12/hotcloud12-final28.pdf - -* [Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters (PDF)][33] - -[34]: http://en.wikipedia.org/wiki/Lazy_evaluation - -* [Wikipedia: Lazy evaluation][34] - -[35]: http://en.wikipedia.org/wiki/Data_parallelism - -* [Wikipedia: Data Parallelism][35] diff --git a/posts/spark.org b/posts/spark.org new file mode 100644 index 0000000..7ed0685 --- /dev/null +++ b/posts/spark.org @@ -0,0 +1,783 @@ +#+TITLE: Real-Time Streaming with Apache Spark Streaming +#+DESCRIPTION: Overview of Apache Spark and a sample Twitter Sentiment Analysis +#+TAGS: Apache Spark +#+TAGS: Apache Kafka +#+TAGS: Apache +#+TAGS: Java +#+TAGS: Sentiment Analysis +#+TAGS: Real-time Streaming +#+TAGS: ZData Inc. +#+DATE: 2014-08-18 +#+SLUG: real-time-streaming-apache-spark-streaming +#+LINK: storm-and-kafka /blog/2014/07/real-time-streaming-storm-and-kafka/ +#+LINK: kafka https://kafka.apache.org/ +#+LINK: spark https://spark.apache.org/ +#+LINK: spark-sql https://spark.apache.org/sql/ +#+LINK: spark-streaming https://spark.apache.org/streaming/ +#+LINK: spark-mllib https://spark.apache.org/mllib/ +#+LINK: spark-graphx https://spark.apache.org/graphx/ +#+LINK: lazy-evaluation http://en.wikipedia.org/wiki/Lazy_evaluation +#+LINK: nsdi-12 https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf +#+LINK: spark-programming-guide http://spark.apache.org/docs/latest/programming-guide.html +#+LINK: data-parallelism http://en.wikipedia.org/wiki/Data_parallelism +#+LINK: spark-faq http://spark.apache.org/faq.html +#+LINK: S3 http://aws.amazon.com/s3/ +#+LINK: NFS http://en.wikipedia.org/wiki/Network_File_System +#+LINK: HDFS +#+LINK: spark-standalone http://spark.apache.org/docs/latest/spark-standalone.html +#+LINK: YARN http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html +#+LINK: spark-running-yarn http://spark.apache.org/docs/latest/running-on-yarn.html +#+LINK: Mesos http://mesos.apache.org +#+LINK: spark-running-mesos http://spark.apache.org/docs/latest/running-on-mesos.html +#+LINK: EC2 http://aws.amazon.com/ec2/ +#+LINK: spark-running-ec2 http://spark.apache.org/docs/latest/ec2-scripts.html +#+LINK: Scala http://www.scala-lang.org/ +#+LINK: Java https://en.wikipedia.org/wiki/Java_%28programming_language%29 +#+LINK: Python https://www.python.org/ +#+LINK: monad-progrmaming http://en.wikipedia.org/wiki/Monad_(functional_programming) +#+LINK: maven-dependency-hell http://cupofjava.de/blog/2013/02/01/fight-dependency-hell-in-maven/ +#+LINK: spark-939 https://issues.apache.org/jira/browse/SPARK-939 +#+LINK: storm-sample-project https://github.com/zdata-inc/StormSampleProject +#+LINK: kafka-producer https://github.com/zdata-inc/SimpleKafkaProducer +#+LINK: storm-kafka-streaming https://kennyballou.com/blog/2014/07/real-time-streaming-storm-and-kafka +#+LINK: spark-sample-project https://github.com/zdata-inc/SparkSampleProject +#+LINK: jackson-databind https://github.com/FasterXML/jackson-databind +#+LINK: hotcloud12 https://www.usenix.org/system/files/conference/hotcloud12/hotcloud12-final28.pdf +#+LINK: spark-sql-future http://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html +#+LINK: bag-of-words http://en.wikipedia.org/wiki/Bag-of-words_model +#+LINK: spark-kafka-extern-lib https://github.com/apache/spark/tree/master/external/kafka +#+LINK: docker http://www.docker.io/ +#+LINK: state-spark-2014 http://inside-bigdata.com/2014/07/15/theres-spark-theres-fire-state-apache-spark-2014/ + +#+BEGIN_PREVIEW +This is the second post in a series on real-time systems tangential to the +Hadoop ecosystem. [[storm-and-kafka][Last time]], we talked about +[[kafka][Apache Kafka]] and Apache Storm for use in a real-time processing +engine. Today, we will be exploring Apache Spark (Streaming) as part of a +real-time processing engine. +#+END_PREVIEW + +** About Spark + +[[spark][Apache Spark]] is a general purpose, large-scale processing engine, +recently fully inducted as an Apache project and is currently under very active +development. As of this writing, Spark is at version 1.0.2 and 1.1 will be +released some time soon. + +Spark is intended to be a drop in replacement for Hadoop MapReduce providing +the benefit of improved performance. Combining Spark with its related projects +and libraries -- [[spark-sql][Spark SQL (formerly Shark)]], +[[spark-streaming][Spark Streaming]], [[spark-mllib][Spark MLlib]], +[[spark-graphx][GraphX]], among others -- and a very capable and promising +processing stack emerges. Spark is capable of reading from HBase, Hive, +Cassandra, and any HDFS data source. Not to mention the many external +libraries that enable consuming data from many more sources, e.g., hooking +Apache Kafka into Spark Streaming is trivial. Further, the Spark Streaming +project provides the ability to continuously compute transformations on data. + +*** Resilient Distributed Datasets + +Apache Spark's primitive type is the Resilient Distributed Dataset (RDD). All +transformations, ~map~, ~join~, ~reduce~, etc., in Spark revolve around this +type. RDD's can be created in one of three ways: /parallelizing/ (distributing +a local dataset); reading a stable, external data source, such as an HDFS file; +or transformations on existing RDD's. + +In Java, parallelizing may look like: + +#+BEGIN_SRC java + List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); + JavaRDD<Integer> distData = sc.parallelize(data); +#+END_SRC + +Where ~sc~ defines the Spark context. + +Similarly, reading a file from HDFS may look like: + +#+BEGIN_SRC java + JavaRDD<String> distFile = sc.textFile("hdfs:///data.txt"); +#+END_SRC + +The resiliency of RDD's comes from their [[lazy-evaluation][lazy]] +materialization and the information required to enable this lazy nature. RDD's +are not always fully materialized but they /do/ contain enough information +(their linage) to be (re)created from a stable source [[nsdi-12][Zaharia et +al.]]. + +RDD's are distributed among the participating machines, and RDD transformations +are coarse-grained -- the same transformation will be applied to /every/ +element in an RDD. The number of partitions in an RDD is generally defined by +the locality of the stable source, however, the user may control this number +via repartitioning. + +Another important property to mention, RDD's are actually immutable. This +immutability can be illustrated with [[spark-programming-guide][Spark's]] Word +Count example: + +#+BEGIN_SRC java + JavaRDD<String> file = sc.textFile("hdfs:///data.txt"); + JavaRDD<String> words = file.flatMap( + new FlatMapFunction<String, String>() { + public Iterable<String> call(String line) { + return Arrays.asList(line.split(" ")); + } + } + ); + JavaPairRDD<String, Integer> pairs = words.map( + new PairFunction<String, String, Integer>() { + public Tuple2<String, Integer> call(String word) { + return new Tuple2<String, Integer>(word, 1); + } + } + ); + JavaPairRDD<String, Integer> counts = pairs.reduceByKey( + new Function2<Integer, Integer>() { + public Integer call(Integer a, Integer b) { return a + b; } + } + ); + counts.saveAsTextFile("hdfs:///data_counted.txt"); +#+END_SRC + +This is the canonical word count example, but here is a brief explanation: load +a file into an RDD, split the words into a new RDD, map the words into pairs +where each word is given a count (one), then reduce the counts of each word by +a key, in this case the word itself. Notice, each operation, ~map~, ~flatMap~, +~reduceByKey~, creates a /new/ RDD. + +To bring all these properties together, Resilient Distributed Datasets are +read-only, lazy distributed sets of elements that can have a chain of +transformations applied to them. They facilitate resiliency by storing lineage +graphs of the transformations (to be) applied and they +[[data-parallelism][parallelize]] the computations by partitioning the data +among the participating machines. + +*** Discretized Streams + +Moving to Spark Streaming, the primitive is still RDD's. However, there is +another type for encapsulating a continuous stream of data: Discretized Streams +or DStreams. DStreams are defined as sequences of RDD's. A DStream is created +from an input source, such as Apache Kafka, or from the transformation of +another DStream. + +Turns out, programming against DStreams is /very/ similar to programming +against RDD's. The same word count code can be slightly modified to create a +streaming word counter: + +#+BEGIN_SRC java + JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999); + JavaDStream<String> words = lines.flatMap( + new FlatMapFunction<String, String>() { + public Iterable<String> call(String line) { + return Arrays.asList(line.split(" ")); + } + } + ); + JavaPairDStream<String, Integer> pairs = words.map( + new PairFunction<String, String, Integer>() { + public Tuple2<String, Integer> call(String word) { + return new Tuple2<String, Integer>(word, 1); + } + } + ); + JavaPairDStream<String, Integer> counts = pairs.reduceByKey( + new Function2<Integer, Integer>() { + public Integer call(Integer a, Integer b) { return a + b; } + } + ); + counts.print(); +#+END_SRC + +Notice, really the only change between first example's code is the return +types. In the streaming context, transformations are working on streams of +RDD's, Spark handles applying the functions (that work against data in the +RDD's) to the RDD's in the current batch/ DStream. + +Though programming against DStreams is similar, there are indeed some +differences as well. Chiefly, DStreams also have /statefull/ transformations. +These include sharing state between batches/ intervals and modifying the +current frame when aggregating over a sliding window. + +#+BEGIN_QUOTE + The key idea is to treat streaming as a series of short batch jobs, + and bring down the latency of these jobs as much as possible. This + brings many of the benefits of batch processing models to stream + processing, including clear consistency semantics and a new parallel + recovery technique... + [[[https://www.usenix.org/system/files/conference/hotcloud12/hotcloud12-final28.pdf][Zaharia + et al.]]] +#+END_QUOTE + +*** Hadoop Requirements + +Technically speaking, Apache Spark does [[spark-faq][/not/]] require Hadoop to +be fully functional. In a cluster setting, however, a means of sharing files +between tasks will need to be facilitated. This could be accomplished through +[[S3][S3]], [[NFS][NFS]], or, more typically, [[HDFS][HDFS]]. + +*** Running Spark Applications + +Apache Spark applications can run in [[spark-standalone][standalone mode]] or +be managed by [[YARN][YARN]]([[spark-running-yarn][Running Spark on YARN]]), +[[Mesos][Mesos]]([[spark-running-mesos][Running Spark on Mesos]]), and even +[[EC2][EC2]]([[spark-running-ec2][Running Spark on EC2]]). Furthermore, if +running under YARN or Mesos, Spark does not need to be installed to work. That +is, Spark code can execute on YARN and Mesos clusters without change to the +cluster. + +*** Language Support + +Currently, Apache Spark supports the [[Scala][Scala]], [[Java][Java]], and +[[Python][Python]] programming languages. Though, this post will only be +discussing examples in [[Java][Java]]. + +*** Initial Thoughts + +Getting away from the idea of directed acyclic graphs (DAG's) is -- may be -- +both a bit of a leap and a benefit. Although it is perfectly acceptable to +define Spark's transformations altogether as a DAG, this can feel awkward when +developing Spark applications. Describing the transformations as +[[monad-programming][Monadic]] feels much more natural. Of course, a monad +structure fits the DAG analogy quite well, especially when considered in some +of the physical analogies such as assembly lines. + +Java's, and consequently Spark's, type strictness was an initial hurdle +to get accustomed. But overall, this is good. It means the compiler will +catch a lot of issues with transformations early. + +Depending on Scala's ~Tuple[\d]~ classes feels second-class, but this is +only a minor tedium. It's too bad current versions of Java don't have +good classes for this common structure. + +YARN and Mesos integration is a very nice benefit as it allows full stack +analytics to not oversubscribe clusters. Furthermore, it gives the ability to +add to existing infrastructure without overloading the developers and the +system administrators with /yet another/ computational suite and/or resource +manager. + +On the negative side of things, dependency hell can creep into Spark projects. +Your project and Spark (and possibly Spark's dependencies) may depend on a +common artifact. If the versions don't [[maven-dependency-hell][converge]], +many subtle problems can emerge. There is an [[spark-939][experimental +configuration option]] to help alleviate this problem, however, for me, it +caused more problems than solved. + +** Test Project: Twitter Stream Sentiment Analysis + +To really test Spark (Streaming), a Twitter Sentiment Analysis project was +developed. It's almost a direct port of the [[storm-sample-project][Storm +code]]. Though there is an external library for hooking Spark directly into +Twitter, Kafka is used so a more precise comparison of Spark and Storm can be +made. + +When the processing is finished, the data are written to HDFS and posted +to a simple NodeJS application. + +*** Setup + :PROPERTIES: + :CUSTOM_ID: setup + :END: + +The setup is the same as +[[https://kennyballou.com/blog/2014/07/real-time-streaming-storm-and-kafka][last +time]]: 5 node Vagrant virtual cluster with each node running 64 bit +CentOS 6.5, given 1 core, and 1024MB of RAM. Every node is running HDFS +(datanode), YARN worker nodes (nodemanager), ZooKeeper, and Kafka. The +first node, ~node0~, is the namenode and resource manager. ~node0~ is +also running a [[http://www.docker.io/][Docker]] container with a NodeJS +application for reporting purposes. + +*** Application Overview + +This project follows a very similar process structure as the Storm Topology +from last time. + +[[file:/media/SentimentAnalysisTopology.png]] + +However, each node in the above graph is actually a transformation on the +current DStream and not an individual process (or group of processes). + +This test project similarly uses the same [[kafka-producer][simple Kafka +producer]] developed. This Kafka producer will be how data are ingested by the +system. + +**** Kafka Receiver Stream + :PROPERTIES: + :CUSTOM_ID: kafka-receiver-stream + :END: + +The data processed is received from a Kafka Stream and is implemented +via the +[[https://github.com/apache/spark/tree/master/external/kafka][external +Kafka]] library. This process simply creates a connection to the Kafka +broker(s), consuming messages from the given set of topics. + +***** Stripping Kafka Message IDs + :PROPERTIES: + :CUSTOM_ID: stripping-kafka-message-ids + :END: + +It turns out the messages from Kafka are retuned as tuples, more +specifically pairs, with the message ID and the message content. Before +continuing, the message ID is stripped and the Twitter JSON data is +passed down the pipeline. + +**** Twitter Data JSON Parsing + :PROPERTIES: + :CUSTOM_ID: twitter-data-json-parsing + :END: + +As was the case last time, the important parts (tweet ID, tweet text, +and language code) need to be extracted from the JSON. Furthermore, this +project only parses English tweets. Non-English tweets are filtered out +at this stage. + +**** Filtering and Stemming + :PROPERTIES: + :CUSTOM_ID: filtering-and-stemming + :END: + +Many tweets contain messy or otherwise unnecessary characters and +punctuation that can be safely ignored. Moreover, there may also be many +common words that cannot be reliably scored either positively or +negatively. At this stage, these symbols and /stop words/ should be +filtered. + +**** Classifiers + :PROPERTIES: + :CUSTOM_ID: classifiers + :END: + +Both the Positive classifier and the Negative classifier are in separate +~map~ transformations. The implementation of both follows the +[[http://en.wikipedia.org/wiki/Bag-of-words_model][Bag-of-words]] model. + +**** Joining and Scoring + :PROPERTIES: + :CUSTOM_ID: joining-and-scoring + :END: + +Because the classifiers are done separately and a join is contrived, the +next step is to join the classifier scores together and actually declare +a winner. It turns out this is quite trivial to do in Spark. + +**** Reporting: HDFS and HTTP POST + :PROPERTIES: + :CUSTOM_ID: reporting-hdfs-and-http-post + :END: + +Finally, once the tweets are joined and scored, the scores need to be +reported. This is accomplished by writing the final tuples to HDFS and +posting a JSON object of the tuple to a simple NodeJS application. + +This process turned out to not be as awkward as was the case with Storm. +The ~foreachRDD~ function of DStreams is a natural way to do side-effect +inducing operations that don't necessarily transform the data. + +*** Implementing the Kafka Producer + +See the [[storm-kafka-streaming][post]] from last time for the details of the +Kafka producer; this has not changed. + +*** Implementing the Spark Streaming Application + +Diving into the code, here are some of the primary aspects of this project. +The full source of this test application can be found on +[[spark-sample-project][Github]]. + +**** Creating Spark Context, Wiring Transformation Chain + +The Spark context, the data source, and the transformations need to be defined. +Proceeding, the context needs to be started. This is all accomplished with the +following code: + +#+BEGIN_SRC java + SparkConf conf = new SparkConf() + .setAppName("Twitter Sentiment Analysis"); + + if (args.length > 0) + conf.setMaster(args[0]); + else + conf.setMaster("local[2]"); + + JavaStreamingContext ssc = new JavaStreamingContext( + conf, + new Duration(2000)); + + Map<String, Integer> topicMap = new HashMap<String, Integer>(); + topicMap.put(KAFKA_TOPIC, KAFKA_PARALLELIZATION); + + JavaPairReceiverInputDStream<String, String> messages = + KafkaUtils.createStream( + ssc, + Properties.getString("rts.spark.zkhosts"), + "twitter.sentimentanalysis.kafka", + topicMap); + + JavaDStream<String> json = messages.map( + new Function<Tuple2<String, String>, String>() { + public String call(Tuple2<String, String> message) { + return message._2(); + } + } + ); + + JavaPairDStream<Long, String> tweets = json.mapToPair( + new TwitterFilterFunction()); + + JavaPairDStream<Long, String> filtered = tweets.filter( + new Function<Tuple2<Long, String>, Boolean>() { + public Boolean call(Tuple2<Long, String> tweet) { + return tweet != null; + } + } + ); + + JavaDStream<Tuple2<Long, String>> tweetsFiltered = filtered.map( + new TextFilterFunction()); + + tweetsFiltered = tweetsFiltered.map( + new StemmingFunction()); + + JavaPairDStream<Tuple2<Long, String>, Float> positiveTweets = + tweetsFiltered.mapToPair(new PositiveScoreFunction()); + + JavaPairDStream<Tuple2<Long, String>, Float> negativeTweets = + tweetsFiltered.mapToPair(new NegativeScoreFunction()); + + JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined = + positiveTweets.join(negativeTweets); + + JavaDStream<Tuple4<Long, String, Float, Float>> scoredTweets = + joined.map(new Function<Tuple2<Tuple2<Long, String>, + Tuple2<Float, Float>>, + Tuple4<Long, String, Float, Float>>() { + public Tuple4<Long, String, Float, Float> call( + Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet) + { + return new Tuple4<Long, String, Float, Float>( + tweet._1()._1(), + tweet._1()._2(), + tweet._2()._1(), + tweet._2()._2()); + } + }); + + JavaDStream<Tuple5<Long, String, Float, Float, String>> result = + scoredTweets.map(new ScoreTweetsFunction()); + + result.foreachRDD(new FileWriter()); + result.foreachRDD(new HTTPNotifierFunction()); + + ssc.start(); + ssc.awaitTermination(); +#+END_SRC + +Some of the more trivial transforms are defined in-line. The others are +defined in their respective files. + +**** Twitter Data Filter / Parser + +Parsing Twitter JSON data is one of the first transformations and is +accomplished with help of the [[jackson-databind][JacksonXML Databind]] +library. + +#+BEGIN_SRC java + JsonNode root = mapper.readValue(tweet, JsonNode.class); + long id; + String text; + if (root.get("lang") != null && + "en".equals(root.get("lang").textValue())) + { + if (root.get("id") != null && root.get("text") != null) + { + id = root.get("id").longValue(); + text = root.get("text").textValue(); + return new Tuple2<Long, String>(id, text); + } + return null; + } + return null; +#+END_SRC + +The ~mapper~ (~ObjectMapper~) object is defined at the class level so it is not +recreated /for each/ RDD in the DStream, a minor optimization. + +You may recall, this is essentially the same code as +[[storm-kafka-streaming][last time]]. The only difference really is that the +tuple is returned instead of being emitted. Because certain situations (e.g., +non-English tweet, malformed tweet) return null, the nulls will need to be +filtered out. Thankfully, Spark provides a simple way to accomplish this: + +#+BEGIN_SRC java + JavaPairDStream<Long, String> filtered = tweets.filter( + new Function<Tuple2<Long, String>, Boolean>() { + public Boolean call(Tuple2<Long, String> tweet) { + return tweet != null; + } + } + ); +#+END_SRC + +**** Text Filtering + +As mentioned before, punctuation and other symbols are simply discarded as they +provide little to no benefit to the classifiers: + +#+BEGIN_SRC java + String text = tweet._2(); + text = text.replaceAll("[^a-zA-Z\\s]", "").trim().toLowerCase(); + return new Tuple2<Long, String>(tweet._1(), text); +#+END_SRC + +Similarly, common words should be discarded as well: + +#+BEGIN_SRC java + String text = tweet._2(); + List<String> stopWords = StopWords.getWords(); + for (String word : stopWords) + { + text = text.replaceAll("\\b" + word + "\\b", ""); + } + return new Tuple2<Long, String>(tweet._1(), text); +#+END_SRC + +**** Positive and Negative Scoring + +Each classifier is defined in its own class. Both classifiers are /very/ +similar in definition. + +The positive classifier is primarily defined by: + +#+BEGIN_SRC java + String text = tweet._2(); + Set<String> posWords = PositiveWords.getWords(); + String[] words = text.split(" "); + int numWords = words.length; + int numPosWords = 0; + for (String word : words) + { + if (posWords.contains(word)) + numPosWords++; + } + return new Tuple2<Tuple2<Long, String>, Float>( + new Tuple2<Long, String>(tweet._1(), tweet._2()), + (float) numPosWords / numWords + ); +#+END_SRC + +And the negative classifier: + +#+BEGIN_SRC java + String text = tweet._2(); + Set<String> negWords = NegativeWords.getWords(); + String[] words = text.split(" "); + int numWords = words.length; + int numPosWords = 0; + for (String word : words) + { + if (negWords.contains(word)) + numPosWords++; + } + return new Tuple2<Tuple2<Long, String>, Float>( + new Tuple2<Long, String>(tweet._1(), tweet._2()), + (float) numPosWords / numWords + ); +#+END_SRC + +Because both are implementing a ~PairFunction~, a join situation is contrived. +However, this could /easily/ be defined differently such that one classifier is +computed, then the next, without ever needing to join the two together. + +**** Joining + +It turns out, joining in Spark is very easy to accomplish. So easy in fact, it +can be handled without virtually /any/ code: + +#+BEGIN_SRC java + JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined = + positiveTweets.join(negativeTweets); +#+END_SRC + +But because working with a Tuple of nested tuples seems unwieldy, transform it +to a 4 element tuple: + +#+BEGIN_SRC java + public Tuple4<Long, String, Float, Float> call( + Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet) + { + return new Tuple4<Long, String, Float, Float>( + tweet._1()._1(), + tweet._1()._2(), + tweet._2()._1(), + tweet._2()._2()); + } +#+END_SRC + +**** Scoring: Declaring Winning Class + +Declaring the winning class is a matter of a simple map, comparing each class's +score and take the greatest: + +#+BEGIN_SRC java + String score; + if (tweet._3() >= tweet._4()) + score = "positive"; + else + score = "negative"; + return new Tuple5<Long, String, Float, Float, String>( + tweet._1(), + tweet._2(), + tweet._3(), + tweet._4(), + score); +#+END_SRC + +This declarer is more optimistic about the neutral case but is otherwise +very straightforward. + +**** Reporting the Results + +Finally, the pipeline completes with writing the results to HDFS: + +#+BEGIN_SRC java + if (rdd.count() <= 0) return null; + String path = Properties.getString("rts.spark.hdfs_output_file") + + "_" + + time.milliseconds(); + rdd.saveAsTextFile(path); +#+END_SRC + +And sending POST request to a NodeJS application: + +#+BEGIN_SRC java + rdd.foreach(new SendPostFunction()); +#+END_SRC + +Where ~SendPostFunction~ is primarily given by: + +#+BEGIN_SRC java + String webserver = Properties.getString("rts.spark.webserv"); + HttpClient client = new DefaultHttpClient(); + HttpPost post = new HttpPost(webserver); + String content = String.format( + "{\"id\": \"%d\", " + + "\"text\": \"%s\", " + + "\"pos\": \"%f\", " + + "\"neg\": \"%f\", " + + "\"score\": \"%s\" }", + tweet._1(), + tweet._2(), + tweet._3(), + tweet._4(), + tweet._5()); + + try + { + post.setEntity(new StringEntity(content)); + HttpResponse response = client.execute(post); + org.apache.http.util.EntityUtils.consume(response.getEntity()); + } + catch (Exception ex) + { + Logger LOG = Logger.getLogger(this.getClass()); + LOG.error("exception thrown while attempting to post", ex); + LOG.trace(null, ex); + } +#+END_SRC + +Each file written to HDFS /will/ have data in it, but the data written will be +small. A better batching procedure should be implemented so the files written +match the HDFS block size. + +Similarly, a POST request is opened /for each/ scored tweet. This can be +expensive on both the Spark Streaming batch timings and the web server +receiving the requests. Batching here could similarly improve overall +performance of the system. + +That said, writing these side-effects this way fits very naturally into +the Spark programming style. + +** Summary + +Apache Spark, in combination with Apache Kafka, has some amazing potential. +And not only in the Streaming context, but as a drop-in replacement for +traditional Hadoop MapReduce. This combination makes it a very good candidate +for a part in an analytics engine. + +Stay tuned, as the next post will be a more in-depth comparison between Apache +Spark and Apache Storm. + +** Related Links / References + +- [[spark][Apache Spark]] + +- [[state-spark-2014][State of Apache Spark 2014]] + +- [[storm-sample-project][Storm Sample Project]] + +- [[spark-939][SPARK-939]] + +- [[kafka][Apache Spark]] + +- [[storm-kafka-streaming][Real-Time Streaming with Apache Storm and Apache + Kafka]] + +- [[docker][Docker IO Project Page]] + +- [[S3][Amazon S3]] + +- [[NFS][Network File System (NFS)]] + +- [[YARN][Hadoop YARN]] + +- [[mesos][Apache Mesos]] + +- [[spark-streaming][Spark Streaming Programming Guide]] + +- [[monad-programming][Monad]] + +- [[spark-sql][Spark SQL]] + +- [[spark-streaming][Spark Streaming]] + +- [[spark-mllib][MLlib]] + +- [[spark-graphx][GraphX]] + +- [[spark-standalone][Spark Standalone Mode]] + +- [[spark-running-yarn][Running on YARN]] + +- [[spark-running-mesos][Running on Mesos]] + +- [[mave-dependency-hell][Fight Dependency Hell in Maven]] + +- [[kafka-producer][Simple Kafka Producer]] + +- [[spark-kafka-extern-lib][Spark: External Kafka Library]] + +- [[spark-sample-project][Spark Sample Project]] + +- [[bag-of-words][Wikipedia: Bag-of-words]] + +- [[jackson-databind][Jackson XML Databind Project]] + +- [[sparking-programming-guide][Spark Programming Guide]] + +- [[EC2][Amazon EC2]] + +- [[spark-running-ec2][Running Spark on EC2]] + +- [[spark-faq][Spark FAQ]] + +- [[spark-sql-future][Future of Shark]] + +- [[nids12][Resilient Distributed Datasets: A Fault-Tolerant Abstraction for + In-Memory Cluster Computing (PDF)]] + +- [[hotcloud12][Discretized Streams: An Efficient and Fault-Tolerant Model for + Stream Processing on Large Clusters (PDF)]] + +- [[lazy-evaluation][Wikipedia: Lazy evaluation]] + +- [[data-parallelism][Wikipedia: Data Parallelism]] |