aboutsummaryrefslogtreecommitdiff
path: root/posts/spark.org
diff options
context:
space:
mode:
Diffstat (limited to 'posts/spark.org')
-rw-r--r--posts/spark.org783
1 files changed, 783 insertions, 0 deletions
diff --git a/posts/spark.org b/posts/spark.org
new file mode 100644
index 0000000..7ed0685
--- /dev/null
+++ b/posts/spark.org
@@ -0,0 +1,783 @@
+#+TITLE: Real-Time Streaming with Apache Spark Streaming
+#+DESCRIPTION: Overview of Apache Spark and a sample Twitter Sentiment Analysis
+#+TAGS: Apache Spark
+#+TAGS: Apache Kafka
+#+TAGS: Apache
+#+TAGS: Java
+#+TAGS: Sentiment Analysis
+#+TAGS: Real-time Streaming
+#+TAGS: ZData Inc.
+#+DATE: 2014-08-18
+#+SLUG: real-time-streaming-apache-spark-streaming
+#+LINK: storm-and-kafka /blog/2014/07/real-time-streaming-storm-and-kafka/
+#+LINK: kafka https://kafka.apache.org/
+#+LINK: spark https://spark.apache.org/
+#+LINK: spark-sql https://spark.apache.org/sql/
+#+LINK: spark-streaming https://spark.apache.org/streaming/
+#+LINK: spark-mllib https://spark.apache.org/mllib/
+#+LINK: spark-graphx https://spark.apache.org/graphx/
+#+LINK: lazy-evaluation http://en.wikipedia.org/wiki/Lazy_evaluation
+#+LINK: nsdi-12 https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf
+#+LINK: spark-programming-guide http://spark.apache.org/docs/latest/programming-guide.html
+#+LINK: data-parallelism http://en.wikipedia.org/wiki/Data_parallelism
+#+LINK: spark-faq http://spark.apache.org/faq.html
+#+LINK: S3 http://aws.amazon.com/s3/
+#+LINK: NFS http://en.wikipedia.org/wiki/Network_File_System
+#+LINK: HDFS
+#+LINK: spark-standalone http://spark.apache.org/docs/latest/spark-standalone.html
+#+LINK: YARN http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html
+#+LINK: spark-running-yarn http://spark.apache.org/docs/latest/running-on-yarn.html
+#+LINK: Mesos http://mesos.apache.org
+#+LINK: spark-running-mesos http://spark.apache.org/docs/latest/running-on-mesos.html
+#+LINK: EC2 http://aws.amazon.com/ec2/
+#+LINK: spark-running-ec2 http://spark.apache.org/docs/latest/ec2-scripts.html
+#+LINK: Scala http://www.scala-lang.org/
+#+LINK: Java https://en.wikipedia.org/wiki/Java_%28programming_language%29
+#+LINK: Python https://www.python.org/
+#+LINK: monad-progrmaming http://en.wikipedia.org/wiki/Monad_(functional_programming)
+#+LINK: maven-dependency-hell http://cupofjava.de/blog/2013/02/01/fight-dependency-hell-in-maven/
+#+LINK: spark-939 https://issues.apache.org/jira/browse/SPARK-939
+#+LINK: storm-sample-project https://github.com/zdata-inc/StormSampleProject
+#+LINK: kafka-producer https://github.com/zdata-inc/SimpleKafkaProducer
+#+LINK: storm-kafka-streaming https://kennyballou.com/blog/2014/07/real-time-streaming-storm-and-kafka
+#+LINK: spark-sample-project https://github.com/zdata-inc/SparkSampleProject
+#+LINK: jackson-databind https://github.com/FasterXML/jackson-databind
+#+LINK: hotcloud12 https://www.usenix.org/system/files/conference/hotcloud12/hotcloud12-final28.pdf
+#+LINK: spark-sql-future http://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html
+#+LINK: bag-of-words http://en.wikipedia.org/wiki/Bag-of-words_model
+#+LINK: spark-kafka-extern-lib https://github.com/apache/spark/tree/master/external/kafka
+#+LINK: docker http://www.docker.io/
+#+LINK: state-spark-2014 http://inside-bigdata.com/2014/07/15/theres-spark-theres-fire-state-apache-spark-2014/
+
+#+BEGIN_PREVIEW
+This is the second post in a series on real-time systems tangential to the
+Hadoop ecosystem. [[storm-and-kafka][Last time]], we talked about
+[[kafka][Apache Kafka]] and Apache Storm for use in a real-time processing
+engine. Today, we will be exploring Apache Spark (Streaming) as part of a
+real-time processing engine.
+#+END_PREVIEW
+
+** About Spark
+
+[[spark][Apache Spark]] is a general purpose, large-scale processing engine,
+recently fully inducted as an Apache project and is currently under very active
+development. As of this writing, Spark is at version 1.0.2 and 1.1 will be
+released some time soon.
+
+Spark is intended to be a drop in replacement for Hadoop MapReduce providing
+the benefit of improved performance. Combining Spark with its related projects
+and libraries -- [[spark-sql][Spark SQL (formerly Shark)]],
+[[spark-streaming][Spark Streaming]], [[spark-mllib][Spark MLlib]],
+[[spark-graphx][GraphX]], among others -- and a very capable and promising
+processing stack emerges. Spark is capable of reading from HBase, Hive,
+Cassandra, and any HDFS data source. Not to mention the many external
+libraries that enable consuming data from many more sources, e.g., hooking
+Apache Kafka into Spark Streaming is trivial. Further, the Spark Streaming
+project provides the ability to continuously compute transformations on data.
+
+*** Resilient Distributed Datasets
+
+Apache Spark's primitive type is the Resilient Distributed Dataset (RDD). All
+transformations, ~map~, ~join~, ~reduce~, etc., in Spark revolve around this
+type. RDD's can be created in one of three ways: /parallelizing/ (distributing
+a local dataset); reading a stable, external data source, such as an HDFS file;
+or transformations on existing RDD's.
+
+In Java, parallelizing may look like:
+
+#+BEGIN_SRC java
+ List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
+ JavaRDD<Integer> distData = sc.parallelize(data);
+#+END_SRC
+
+Where ~sc~ defines the Spark context.
+
+Similarly, reading a file from HDFS may look like:
+
+#+BEGIN_SRC java
+ JavaRDD<String> distFile = sc.textFile("hdfs:///data.txt");
+#+END_SRC
+
+The resiliency of RDD's comes from their [[lazy-evaluation][lazy]]
+materialization and the information required to enable this lazy nature. RDD's
+are not always fully materialized but they /do/ contain enough information
+(their linage) to be (re)created from a stable source [[nsdi-12][Zaharia et
+al.]].
+
+RDD's are distributed among the participating machines, and RDD transformations
+are coarse-grained -- the same transformation will be applied to /every/
+element in an RDD. The number of partitions in an RDD is generally defined by
+the locality of the stable source, however, the user may control this number
+via repartitioning.
+
+Another important property to mention, RDD's are actually immutable. This
+immutability can be illustrated with [[spark-programming-guide][Spark's]] Word
+Count example:
+
+#+BEGIN_SRC java
+ JavaRDD<String> file = sc.textFile("hdfs:///data.txt");
+ JavaRDD<String> words = file.flatMap(
+ new FlatMapFunction<String, String>() {
+ public Iterable<String> call(String line) {
+ return Arrays.asList(line.split(" "));
+ }
+ }
+ );
+ JavaPairRDD<String, Integer> pairs = words.map(
+ new PairFunction<String, String, Integer>() {
+ public Tuple2<String, Integer> call(String word) {
+ return new Tuple2<String, Integer>(word, 1);
+ }
+ }
+ );
+ JavaPairRDD<String, Integer> counts = pairs.reduceByKey(
+ new Function2<Integer, Integer>() {
+ public Integer call(Integer a, Integer b) { return a + b; }
+ }
+ );
+ counts.saveAsTextFile("hdfs:///data_counted.txt");
+#+END_SRC
+
+This is the canonical word count example, but here is a brief explanation: load
+a file into an RDD, split the words into a new RDD, map the words into pairs
+where each word is given a count (one), then reduce the counts of each word by
+a key, in this case the word itself. Notice, each operation, ~map~, ~flatMap~,
+~reduceByKey~, creates a /new/ RDD.
+
+To bring all these properties together, Resilient Distributed Datasets are
+read-only, lazy distributed sets of elements that can have a chain of
+transformations applied to them. They facilitate resiliency by storing lineage
+graphs of the transformations (to be) applied and they
+[[data-parallelism][parallelize]] the computations by partitioning the data
+among the participating machines.
+
+*** Discretized Streams
+
+Moving to Spark Streaming, the primitive is still RDD's. However, there is
+another type for encapsulating a continuous stream of data: Discretized Streams
+or DStreams. DStreams are defined as sequences of RDD's. A DStream is created
+from an input source, such as Apache Kafka, or from the transformation of
+another DStream.
+
+Turns out, programming against DStreams is /very/ similar to programming
+against RDD's. The same word count code can be slightly modified to create a
+streaming word counter:
+
+#+BEGIN_SRC java
+ JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999);
+ JavaDStream<String> words = lines.flatMap(
+ new FlatMapFunction<String, String>() {
+ public Iterable<String> call(String line) {
+ return Arrays.asList(line.split(" "));
+ }
+ }
+ );
+ JavaPairDStream<String, Integer> pairs = words.map(
+ new PairFunction<String, String, Integer>() {
+ public Tuple2<String, Integer> call(String word) {
+ return new Tuple2<String, Integer>(word, 1);
+ }
+ }
+ );
+ JavaPairDStream<String, Integer> counts = pairs.reduceByKey(
+ new Function2<Integer, Integer>() {
+ public Integer call(Integer a, Integer b) { return a + b; }
+ }
+ );
+ counts.print();
+#+END_SRC
+
+Notice, really the only change between first example's code is the return
+types. In the streaming context, transformations are working on streams of
+RDD's, Spark handles applying the functions (that work against data in the
+RDD's) to the RDD's in the current batch/ DStream.
+
+Though programming against DStreams is similar, there are indeed some
+differences as well. Chiefly, DStreams also have /statefull/ transformations.
+These include sharing state between batches/ intervals and modifying the
+current frame when aggregating over a sliding window.
+
+#+BEGIN_QUOTE
+ The key idea is to treat streaming as a series of short batch jobs,
+ and bring down the latency of these jobs as much as possible. This
+ brings many of the benefits of batch processing models to stream
+ processing, including clear consistency semantics and a new parallel
+ recovery technique...
+ [[[https://www.usenix.org/system/files/conference/hotcloud12/hotcloud12-final28.pdf][Zaharia
+ et al.]]]
+#+END_QUOTE
+
+*** Hadoop Requirements
+
+Technically speaking, Apache Spark does [[spark-faq][/not/]] require Hadoop to
+be fully functional. In a cluster setting, however, a means of sharing files
+between tasks will need to be facilitated. This could be accomplished through
+[[S3][S3]], [[NFS][NFS]], or, more typically, [[HDFS][HDFS]].
+
+*** Running Spark Applications
+
+Apache Spark applications can run in [[spark-standalone][standalone mode]] or
+be managed by [[YARN][YARN]]([[spark-running-yarn][Running Spark on YARN]]),
+[[Mesos][Mesos]]([[spark-running-mesos][Running Spark on Mesos]]), and even
+[[EC2][EC2]]([[spark-running-ec2][Running Spark on EC2]]). Furthermore, if
+running under YARN or Mesos, Spark does not need to be installed to work. That
+is, Spark code can execute on YARN and Mesos clusters without change to the
+cluster.
+
+*** Language Support
+
+Currently, Apache Spark supports the [[Scala][Scala]], [[Java][Java]], and
+[[Python][Python]] programming languages. Though, this post will only be
+discussing examples in [[Java][Java]].
+
+*** Initial Thoughts
+
+Getting away from the idea of directed acyclic graphs (DAG's) is -- may be --
+both a bit of a leap and a benefit. Although it is perfectly acceptable to
+define Spark's transformations altogether as a DAG, this can feel awkward when
+developing Spark applications. Describing the transformations as
+[[monad-programming][Monadic]] feels much more natural. Of course, a monad
+structure fits the DAG analogy quite well, especially when considered in some
+of the physical analogies such as assembly lines.
+
+Java's, and consequently Spark's, type strictness was an initial hurdle
+to get accustomed. But overall, this is good. It means the compiler will
+catch a lot of issues with transformations early.
+
+Depending on Scala's ~Tuple[\d]~ classes feels second-class, but this is
+only a minor tedium. It's too bad current versions of Java don't have
+good classes for this common structure.
+
+YARN and Mesos integration is a very nice benefit as it allows full stack
+analytics to not oversubscribe clusters. Furthermore, it gives the ability to
+add to existing infrastructure without overloading the developers and the
+system administrators with /yet another/ computational suite and/or resource
+manager.
+
+On the negative side of things, dependency hell can creep into Spark projects.
+Your project and Spark (and possibly Spark's dependencies) may depend on a
+common artifact. If the versions don't [[maven-dependency-hell][converge]],
+many subtle problems can emerge. There is an [[spark-939][experimental
+configuration option]] to help alleviate this problem, however, for me, it
+caused more problems than solved.
+
+** Test Project: Twitter Stream Sentiment Analysis
+
+To really test Spark (Streaming), a Twitter Sentiment Analysis project was
+developed. It's almost a direct port of the [[storm-sample-project][Storm
+code]]. Though there is an external library for hooking Spark directly into
+Twitter, Kafka is used so a more precise comparison of Spark and Storm can be
+made.
+
+When the processing is finished, the data are written to HDFS and posted
+to a simple NodeJS application.
+
+*** Setup
+ :PROPERTIES:
+ :CUSTOM_ID: setup
+ :END:
+
+The setup is the same as
+[[https://kennyballou.com/blog/2014/07/real-time-streaming-storm-and-kafka][last
+time]]: 5 node Vagrant virtual cluster with each node running 64 bit
+CentOS 6.5, given 1 core, and 1024MB of RAM. Every node is running HDFS
+(datanode), YARN worker nodes (nodemanager), ZooKeeper, and Kafka. The
+first node, ~node0~, is the namenode and resource manager. ~node0~ is
+also running a [[http://www.docker.io/][Docker]] container with a NodeJS
+application for reporting purposes.
+
+*** Application Overview
+
+This project follows a very similar process structure as the Storm Topology
+from last time.
+
+[[file:/media/SentimentAnalysisTopology.png]]
+
+However, each node in the above graph is actually a transformation on the
+current DStream and not an individual process (or group of processes).
+
+This test project similarly uses the same [[kafka-producer][simple Kafka
+producer]] developed. This Kafka producer will be how data are ingested by the
+system.
+
+**** Kafka Receiver Stream
+ :PROPERTIES:
+ :CUSTOM_ID: kafka-receiver-stream
+ :END:
+
+The data processed is received from a Kafka Stream and is implemented
+via the
+[[https://github.com/apache/spark/tree/master/external/kafka][external
+Kafka]] library. This process simply creates a connection to the Kafka
+broker(s), consuming messages from the given set of topics.
+
+***** Stripping Kafka Message IDs
+ :PROPERTIES:
+ :CUSTOM_ID: stripping-kafka-message-ids
+ :END:
+
+It turns out the messages from Kafka are retuned as tuples, more
+specifically pairs, with the message ID and the message content. Before
+continuing, the message ID is stripped and the Twitter JSON data is
+passed down the pipeline.
+
+**** Twitter Data JSON Parsing
+ :PROPERTIES:
+ :CUSTOM_ID: twitter-data-json-parsing
+ :END:
+
+As was the case last time, the important parts (tweet ID, tweet text,
+and language code) need to be extracted from the JSON. Furthermore, this
+project only parses English tweets. Non-English tweets are filtered out
+at this stage.
+
+**** Filtering and Stemming
+ :PROPERTIES:
+ :CUSTOM_ID: filtering-and-stemming
+ :END:
+
+Many tweets contain messy or otherwise unnecessary characters and
+punctuation that can be safely ignored. Moreover, there may also be many
+common words that cannot be reliably scored either positively or
+negatively. At this stage, these symbols and /stop words/ should be
+filtered.
+
+**** Classifiers
+ :PROPERTIES:
+ :CUSTOM_ID: classifiers
+ :END:
+
+Both the Positive classifier and the Negative classifier are in separate
+~map~ transformations. The implementation of both follows the
+[[http://en.wikipedia.org/wiki/Bag-of-words_model][Bag-of-words]] model.
+
+**** Joining and Scoring
+ :PROPERTIES:
+ :CUSTOM_ID: joining-and-scoring
+ :END:
+
+Because the classifiers are done separately and a join is contrived, the
+next step is to join the classifier scores together and actually declare
+a winner. It turns out this is quite trivial to do in Spark.
+
+**** Reporting: HDFS and HTTP POST
+ :PROPERTIES:
+ :CUSTOM_ID: reporting-hdfs-and-http-post
+ :END:
+
+Finally, once the tweets are joined and scored, the scores need to be
+reported. This is accomplished by writing the final tuples to HDFS and
+posting a JSON object of the tuple to a simple NodeJS application.
+
+This process turned out to not be as awkward as was the case with Storm.
+The ~foreachRDD~ function of DStreams is a natural way to do side-effect
+inducing operations that don't necessarily transform the data.
+
+*** Implementing the Kafka Producer
+
+See the [[storm-kafka-streaming][post]] from last time for the details of the
+Kafka producer; this has not changed.
+
+*** Implementing the Spark Streaming Application
+
+Diving into the code, here are some of the primary aspects of this project.
+The full source of this test application can be found on
+[[spark-sample-project][Github]].
+
+**** Creating Spark Context, Wiring Transformation Chain
+
+The Spark context, the data source, and the transformations need to be defined.
+Proceeding, the context needs to be started. This is all accomplished with the
+following code:
+
+#+BEGIN_SRC java
+ SparkConf conf = new SparkConf()
+ .setAppName("Twitter Sentiment Analysis");
+
+ if (args.length > 0)
+ conf.setMaster(args[0]);
+ else
+ conf.setMaster("local[2]");
+
+ JavaStreamingContext ssc = new JavaStreamingContext(
+ conf,
+ new Duration(2000));
+
+ Map<String, Integer> topicMap = new HashMap<String, Integer>();
+ topicMap.put(KAFKA_TOPIC, KAFKA_PARALLELIZATION);
+
+ JavaPairReceiverInputDStream<String, String> messages =
+ KafkaUtils.createStream(
+ ssc,
+ Properties.getString("rts.spark.zkhosts"),
+ "twitter.sentimentanalysis.kafka",
+ topicMap);
+
+ JavaDStream<String> json = messages.map(
+ new Function<Tuple2<String, String>, String>() {
+ public String call(Tuple2<String, String> message) {
+ return message._2();
+ }
+ }
+ );
+
+ JavaPairDStream<Long, String> tweets = json.mapToPair(
+ new TwitterFilterFunction());
+
+ JavaPairDStream<Long, String> filtered = tweets.filter(
+ new Function<Tuple2<Long, String>, Boolean>() {
+ public Boolean call(Tuple2<Long, String> tweet) {
+ return tweet != null;
+ }
+ }
+ );
+
+ JavaDStream<Tuple2<Long, String>> tweetsFiltered = filtered.map(
+ new TextFilterFunction());
+
+ tweetsFiltered = tweetsFiltered.map(
+ new StemmingFunction());
+
+ JavaPairDStream<Tuple2<Long, String>, Float> positiveTweets =
+ tweetsFiltered.mapToPair(new PositiveScoreFunction());
+
+ JavaPairDStream<Tuple2<Long, String>, Float> negativeTweets =
+ tweetsFiltered.mapToPair(new NegativeScoreFunction());
+
+ JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined =
+ positiveTweets.join(negativeTweets);
+
+ JavaDStream<Tuple4<Long, String, Float, Float>> scoredTweets =
+ joined.map(new Function<Tuple2<Tuple2<Long, String>,
+ Tuple2<Float, Float>>,
+ Tuple4<Long, String, Float, Float>>() {
+ public Tuple4<Long, String, Float, Float> call(
+ Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet)
+ {
+ return new Tuple4<Long, String, Float, Float>(
+ tweet._1()._1(),
+ tweet._1()._2(),
+ tweet._2()._1(),
+ tweet._2()._2());
+ }
+ });
+
+ JavaDStream<Tuple5<Long, String, Float, Float, String>> result =
+ scoredTweets.map(new ScoreTweetsFunction());
+
+ result.foreachRDD(new FileWriter());
+ result.foreachRDD(new HTTPNotifierFunction());
+
+ ssc.start();
+ ssc.awaitTermination();
+#+END_SRC
+
+Some of the more trivial transforms are defined in-line. The others are
+defined in their respective files.
+
+**** Twitter Data Filter / Parser
+
+Parsing Twitter JSON data is one of the first transformations and is
+accomplished with help of the [[jackson-databind][JacksonXML Databind]]
+library.
+
+#+BEGIN_SRC java
+ JsonNode root = mapper.readValue(tweet, JsonNode.class);
+ long id;
+ String text;
+ if (root.get("lang") != null &&
+ "en".equals(root.get("lang").textValue()))
+ {
+ if (root.get("id") != null && root.get("text") != null)
+ {
+ id = root.get("id").longValue();
+ text = root.get("text").textValue();
+ return new Tuple2<Long, String>(id, text);
+ }
+ return null;
+ }
+ return null;
+#+END_SRC
+
+The ~mapper~ (~ObjectMapper~) object is defined at the class level so it is not
+recreated /for each/ RDD in the DStream, a minor optimization.
+
+You may recall, this is essentially the same code as
+[[storm-kafka-streaming][last time]]. The only difference really is that the
+tuple is returned instead of being emitted. Because certain situations (e.g.,
+non-English tweet, malformed tweet) return null, the nulls will need to be
+filtered out. Thankfully, Spark provides a simple way to accomplish this:
+
+#+BEGIN_SRC java
+ JavaPairDStream<Long, String> filtered = tweets.filter(
+ new Function<Tuple2<Long, String>, Boolean>() {
+ public Boolean call(Tuple2<Long, String> tweet) {
+ return tweet != null;
+ }
+ }
+ );
+#+END_SRC
+
+**** Text Filtering
+
+As mentioned before, punctuation and other symbols are simply discarded as they
+provide little to no benefit to the classifiers:
+
+#+BEGIN_SRC java
+ String text = tweet._2();
+ text = text.replaceAll("[^a-zA-Z\\s]", "").trim().toLowerCase();
+ return new Tuple2<Long, String>(tweet._1(), text);
+#+END_SRC
+
+Similarly, common words should be discarded as well:
+
+#+BEGIN_SRC java
+ String text = tweet._2();
+ List<String> stopWords = StopWords.getWords();
+ for (String word : stopWords)
+ {
+ text = text.replaceAll("\\b" + word + "\\b", "");
+ }
+ return new Tuple2<Long, String>(tweet._1(), text);
+#+END_SRC
+
+**** Positive and Negative Scoring
+
+Each classifier is defined in its own class. Both classifiers are /very/
+similar in definition.
+
+The positive classifier is primarily defined by:
+
+#+BEGIN_SRC java
+ String text = tweet._2();
+ Set<String> posWords = PositiveWords.getWords();
+ String[] words = text.split(" ");
+ int numWords = words.length;
+ int numPosWords = 0;
+ for (String word : words)
+ {
+ if (posWords.contains(word))
+ numPosWords++;
+ }
+ return new Tuple2<Tuple2<Long, String>, Float>(
+ new Tuple2<Long, String>(tweet._1(), tweet._2()),
+ (float) numPosWords / numWords
+ );
+#+END_SRC
+
+And the negative classifier:
+
+#+BEGIN_SRC java
+ String text = tweet._2();
+ Set<String> negWords = NegativeWords.getWords();
+ String[] words = text.split(" ");
+ int numWords = words.length;
+ int numPosWords = 0;
+ for (String word : words)
+ {
+ if (negWords.contains(word))
+ numPosWords++;
+ }
+ return new Tuple2<Tuple2<Long, String>, Float>(
+ new Tuple2<Long, String>(tweet._1(), tweet._2()),
+ (float) numPosWords / numWords
+ );
+#+END_SRC
+
+Because both are implementing a ~PairFunction~, a join situation is contrived.
+However, this could /easily/ be defined differently such that one classifier is
+computed, then the next, without ever needing to join the two together.
+
+**** Joining
+
+It turns out, joining in Spark is very easy to accomplish. So easy in fact, it
+can be handled without virtually /any/ code:
+
+#+BEGIN_SRC java
+ JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined =
+ positiveTweets.join(negativeTweets);
+#+END_SRC
+
+But because working with a Tuple of nested tuples seems unwieldy, transform it
+to a 4 element tuple:
+
+#+BEGIN_SRC java
+ public Tuple4<Long, String, Float, Float> call(
+ Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet)
+ {
+ return new Tuple4<Long, String, Float, Float>(
+ tweet._1()._1(),
+ tweet._1()._2(),
+ tweet._2()._1(),
+ tweet._2()._2());
+ }
+#+END_SRC
+
+**** Scoring: Declaring Winning Class
+
+Declaring the winning class is a matter of a simple map, comparing each class's
+score and take the greatest:
+
+#+BEGIN_SRC java
+ String score;
+ if (tweet._3() >= tweet._4())
+ score = "positive";
+ else
+ score = "negative";
+ return new Tuple5<Long, String, Float, Float, String>(
+ tweet._1(),
+ tweet._2(),
+ tweet._3(),
+ tweet._4(),
+ score);
+#+END_SRC
+
+This declarer is more optimistic about the neutral case but is otherwise
+very straightforward.
+
+**** Reporting the Results
+
+Finally, the pipeline completes with writing the results to HDFS:
+
+#+BEGIN_SRC java
+ if (rdd.count() <= 0) return null;
+ String path = Properties.getString("rts.spark.hdfs_output_file") +
+ "_" +
+ time.milliseconds();
+ rdd.saveAsTextFile(path);
+#+END_SRC
+
+And sending POST request to a NodeJS application:
+
+#+BEGIN_SRC java
+ rdd.foreach(new SendPostFunction());
+#+END_SRC
+
+Where ~SendPostFunction~ is primarily given by:
+
+#+BEGIN_SRC java
+ String webserver = Properties.getString("rts.spark.webserv");
+ HttpClient client = new DefaultHttpClient();
+ HttpPost post = new HttpPost(webserver);
+ String content = String.format(
+ "{\"id\": \"%d\", " +
+ "\"text\": \"%s\", " +
+ "\"pos\": \"%f\", " +
+ "\"neg\": \"%f\", " +
+ "\"score\": \"%s\" }",
+ tweet._1(),
+ tweet._2(),
+ tweet._3(),
+ tweet._4(),
+ tweet._5());
+
+ try
+ {
+ post.setEntity(new StringEntity(content));
+ HttpResponse response = client.execute(post);
+ org.apache.http.util.EntityUtils.consume(response.getEntity());
+ }
+ catch (Exception ex)
+ {
+ Logger LOG = Logger.getLogger(this.getClass());
+ LOG.error("exception thrown while attempting to post", ex);
+ LOG.trace(null, ex);
+ }
+#+END_SRC
+
+Each file written to HDFS /will/ have data in it, but the data written will be
+small. A better batching procedure should be implemented so the files written
+match the HDFS block size.
+
+Similarly, a POST request is opened /for each/ scored tweet. This can be
+expensive on both the Spark Streaming batch timings and the web server
+receiving the requests. Batching here could similarly improve overall
+performance of the system.
+
+That said, writing these side-effects this way fits very naturally into
+the Spark programming style.
+
+** Summary
+
+Apache Spark, in combination with Apache Kafka, has some amazing potential.
+And not only in the Streaming context, but as a drop-in replacement for
+traditional Hadoop MapReduce. This combination makes it a very good candidate
+for a part in an analytics engine.
+
+Stay tuned, as the next post will be a more in-depth comparison between Apache
+Spark and Apache Storm.
+
+** Related Links / References
+
+- [[spark][Apache Spark]]
+
+- [[state-spark-2014][State of Apache Spark 2014]]
+
+- [[storm-sample-project][Storm Sample Project]]
+
+- [[spark-939][SPARK-939]]
+
+- [[kafka][Apache Spark]]
+
+- [[storm-kafka-streaming][Real-Time Streaming with Apache Storm and Apache
+ Kafka]]
+
+- [[docker][Docker IO Project Page]]
+
+- [[S3][Amazon S3]]
+
+- [[NFS][Network File System (NFS)]]
+
+- [[YARN][Hadoop YARN]]
+
+- [[mesos][Apache Mesos]]
+
+- [[spark-streaming][Spark Streaming Programming Guide]]
+
+- [[monad-programming][Monad]]
+
+- [[spark-sql][Spark SQL]]
+
+- [[spark-streaming][Spark Streaming]]
+
+- [[spark-mllib][MLlib]]
+
+- [[spark-graphx][GraphX]]
+
+- [[spark-standalone][Spark Standalone Mode]]
+
+- [[spark-running-yarn][Running on YARN]]
+
+- [[spark-running-mesos][Running on Mesos]]
+
+- [[mave-dependency-hell][Fight Dependency Hell in Maven]]
+
+- [[kafka-producer][Simple Kafka Producer]]
+
+- [[spark-kafka-extern-lib][Spark: External Kafka Library]]
+
+- [[spark-sample-project][Spark Sample Project]]
+
+- [[bag-of-words][Wikipedia: Bag-of-words]]
+
+- [[jackson-databind][Jackson XML Databind Project]]
+
+- [[sparking-programming-guide][Spark Programming Guide]]
+
+- [[EC2][Amazon EC2]]
+
+- [[spark-running-ec2][Running Spark on EC2]]
+
+- [[spark-faq][Spark FAQ]]
+
+- [[spark-sql-future][Future of Shark]]
+
+- [[nids12][Resilient Distributed Datasets: A Fault-Tolerant Abstraction for
+ In-Memory Cluster Computing (PDF)]]
+
+- [[hotcloud12][Discretized Streams: An Efficient and Fault-Tolerant Model for
+ Stream Processing on Large Clusters (PDF)]]
+
+- [[lazy-evaluation][Wikipedia: Lazy evaluation]]
+
+- [[data-parallelism][Wikipedia: Data Parallelism]]