diff options
author | kballou <kballou@devnulllabs.io> | 2014-10-11 01:20:54 -0600 |
---|---|---|
committer | kballou <kballou@devnulllabs.io> | 2017-09-02 19:31:44 -0600 |
commit | b0ccdf8eb059f54a0e8cfad51466f12b8cd76451 (patch) | |
tree | 51664a702772ca797c2e5e94dac3bc479105a611 | |
download | blog.kennyballou.com-b0ccdf8eb059f54a0e8cfad51466f12b8cd76451.tar.gz blog.kennyballou.com-b0ccdf8eb059f54a0e8cfad51466f12b8cd76451.tar.xz |
Wobsite and Blag of KennyBallou
28 files changed, 2605 insertions, 0 deletions
@@ -0,0 +1,438 @@ +Attribution-NonCommercial-ShareAlike 4.0 International + +======================================================================= + +Creative Commons Corporation ("Creative Commons") is not a law firm and +does not provide legal services or legal advice. Distribution of +Creative Commons public licenses does not create a lawyer-client or +other relationship. Creative Commons makes its licenses and related +information available on an "as-is" basis. Creative Commons gives no +warranties regarding its licenses, any material licensed under their +terms and conditions, or any related information. Creative Commons +disclaims all liability for damages resulting from their use to the +fullest extent possible. + +Using Creative Commons Public Licenses + +Creative Commons public licenses provide a standard set of terms and +conditions that creators and other rights holders may use to share +original works of authorship and other material subject to copyright +and certain other rights specified in the public license below. The +following considerations are for informational purposes only, are not +exhaustive, and do not form part of our licenses. + + Considerations for licensors: Our public licenses are + intended for use by those authorized to give the public + permission to use material in ways otherwise restricted by + copyright and certain other rights. Our licenses are + irrevocable. Licensors should read and understand the terms + and conditions of the license they choose before applying it. + Licensors should also secure all rights necessary before + applying our licenses so that the public can reuse the + material as expected. Licensors should clearly mark any + material not subject to the license. This includes other CC- + licensed material, or material used under an exception or + limitation to copyright. More considerations for licensors: + wiki.creativecommons.org/Considerations_for_licensors + + Considerations for the public: By using one of our public + licenses, a licensor grants the public permission to use the + licensed material under specified terms and conditions. If + the licensor's permission is not necessary for any reason--for + example, because of any applicable exception or limitation to + copyright--then that use is not regulated by the license. Our + licenses grant only permissions under copyright and certain + other rights that a licensor has authority to grant. Use of + the licensed material may still be restricted for other + reasons, including because others have copyright or other + rights in the material. A licensor may make special requests, + such as asking that all changes be marked or described. + Although not required by our licenses, you are encouraged to + respect those requests where reasonable. More_considerations + for the public: + wiki.creativecommons.org/Considerations_for_licensees + +======================================================================= + +Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International +Public License + +By exercising the Licensed Rights (defined below), You accept and agree +to be bound by the terms and conditions of this Creative Commons +Attribution-NonCommercial-ShareAlike 4.0 International Public License +("Public License"). To the extent this Public License may be +interpreted as a contract, You are granted the Licensed Rights in +consideration of Your acceptance of these terms and conditions, and the +Licensor grants You such rights in consideration of benefits the +Licensor receives from making the Licensed Material available under +these terms and conditions. + + +Section 1 -- Definitions. + + a. Adapted Material means material subject to Copyright and Similar + Rights that is derived from or based upon the Licensed Material + and in which the Licensed Material is translated, altered, + arranged, transformed, or otherwise modified in a manner requiring + permission under the Copyright and Similar Rights held by the + Licensor. For purposes of this Public License, where the Licensed + Material is a musical work, performance, or sound recording, + Adapted Material is always produced where the Licensed Material is + synched in timed relation with a moving image. + + b. Adapter's License means the license You apply to Your Copyright + and Similar Rights in Your contributions to Adapted Material in + accordance with the terms and conditions of this Public License. + + c. BY-NC-SA Compatible License means a license listed at + creativecommons.org/compatiblelicenses, approved by Creative + Commons as essentially the equivalent of this Public License. + + d. Copyright and Similar Rights means copyright and/or similar rights + closely related to copyright including, without limitation, + performance, broadcast, sound recording, and Sui Generis Database + Rights, without regard to how the rights are labeled or + categorized. For purposes of this Public License, the rights + specified in Section 2(b)(1)-(2) are not Copyright and Similar + Rights. + + e. Effective Technological Measures means those measures that, in the + absence of proper authority, may not be circumvented under laws + fulfilling obligations under Article 11 of the WIPO Copyright + Treaty adopted on December 20, 1996, and/or similar international + agreements. + + f. Exceptions and Limitations means fair use, fair dealing, and/or + any other exception or limitation to Copyright and Similar Rights + that applies to Your use of the Licensed Material. + + g. License Elements means the license attributes listed in the name + of a Creative Commons Public License. The License Elements of this + Public License are Attribution, NonCommercial, and ShareAlike. + + h. Licensed Material means the artistic or literary work, database, + or other material to which the Licensor applied this Public + License. + + i. Licensed Rights means the rights granted to You subject to the + terms and conditions of this Public License, which are limited to + all Copyright and Similar Rights that apply to Your use of the + Licensed Material and that the Licensor has authority to license. + + j. Licensor means the individual(s) or entity(ies) granting rights + under this Public License. + + k. NonCommercial means not primarily intended for or directed towards + commercial advantage or monetary compensation. For purposes of + this Public License, the exchange of the Licensed Material for + other material subject to Copyright and Similar Rights by digital + file-sharing or similar means is NonCommercial provided there is + no payment of monetary compensation in connection with the + exchange. + + l. Share means to provide material to the public by any means or + process that requires permission under the Licensed Rights, such + as reproduction, public display, public performance, distribution, + dissemination, communication, or importation, and to make material + available to the public including in ways that members of the + public may access the material from a place and at a time + individually chosen by them. + + m. Sui Generis Database Rights means rights other than copyright + resulting from Directive 96/9/EC of the European Parliament and of + the Council of 11 March 1996 on the legal protection of databases, + as amended and/or succeeded, as well as other essentially + equivalent rights anywhere in the world. + + n. You means the individual or entity exercising the Licensed Rights + under this Public License. Your has a corresponding meaning. + + +Section 2 -- Scope. + + a. License grant. + + 1. Subject to the terms and conditions of this Public License, + the Licensor hereby grants You a worldwide, royalty-free, + non-sublicensable, non-exclusive, irrevocable license to + exercise the Licensed Rights in the Licensed Material to: + + a. reproduce and Share the Licensed Material, in whole or + in part, for NonCommercial purposes only; and + + b. produce, reproduce, and Share Adapted Material for + NonCommercial purposes only. + + 2. Exceptions and Limitations. For the avoidance of doubt, where + Exceptions and Limitations apply to Your use, this Public + License does not apply, and You do not need to comply with + its terms and conditions. + + 3. Term. The term of this Public License is specified in Section + 6(a). + + 4. Media and formats; technical modifications allowed. The + Licensor authorizes You to exercise the Licensed Rights in + all media and formats whether now known or hereafter created, + and to make technical modifications necessary to do so. The + Licensor waives and/or agrees not to assert any right or + authority to forbid You from making technical modifications + necessary to exercise the Licensed Rights, including + technical modifications necessary to circumvent Effective + Technological Measures. For purposes of this Public License, + simply making modifications authorized by this Section 2(a) + (4) never produces Adapted Material. + + 5. Downstream recipients. + + a. Offer from the Licensor -- Licensed Material. Every + recipient of the Licensed Material automatically + receives an offer from the Licensor to exercise the + Licensed Rights under the terms and conditions of this + Public License. + + b. Additional offer from the Licensor -- Adapted Material. + Every recipient of Adapted Material from You + automatically receives an offer from the Licensor to + exercise the Licensed Rights in the Adapted Material + under the conditions of the Adapter's License You apply. + + c. No downstream restrictions. You may not offer or impose + any additional or different terms or conditions on, or + apply any Effective Technological Measures to, the + Licensed Material if doing so restricts exercise of the + Licensed Rights by any recipient of the Licensed + Material. + + 6. No endorsement. Nothing in this Public License constitutes or + may be construed as permission to assert or imply that You + are, or that Your use of the Licensed Material is, connected + with, or sponsored, endorsed, or granted official status by, + the Licensor or others designated to receive attribution as + provided in Section 3(a)(1)(A)(i). + + b. Other rights. + + 1. Moral rights, such as the right of integrity, are not + licensed under this Public License, nor are publicity, + privacy, and/or other similar personality rights; however, to + the extent possible, the Licensor waives and/or agrees not to + assert any such rights held by the Licensor to the limited + extent necessary to allow You to exercise the Licensed + Rights, but not otherwise. + + 2. Patent and trademark rights are not licensed under this + Public License. + + 3. To the extent possible, the Licensor waives any right to + collect royalties from You for the exercise of the Licensed + Rights, whether directly or through a collecting society + under any voluntary or waivable statutory or compulsory + licensing scheme. In all other cases the Licensor expressly + reserves any right to collect such royalties, including when + the Licensed Material is used other than for NonCommercial + purposes. + + +Section 3 -- License Conditions. + +Your exercise of the Licensed Rights is expressly made subject to the +following conditions. + + a. Attribution. + + 1. If You Share the Licensed Material (including in modified + form), You must: + + a. retain the following if it is supplied by the Licensor + with the Licensed Material: + + i. identification of the creator(s) of the Licensed + Material and any others designated to receive + attribution, in any reasonable manner requested by + the Licensor (including by pseudonym if + designated); + + ii. a copyright notice; + + iii. a notice that refers to this Public License; + + iv. a notice that refers to the disclaimer of + warranties; + + v. a URI or hyperlink to the Licensed Material to the + extent reasonably practicable; + + b. indicate if You modified the Licensed Material and + retain an indication of any previous modifications; and + + c. indicate the Licensed Material is licensed under this + Public License, and include the text of, or the URI or + hyperlink to, this Public License. + + 2. You may satisfy the conditions in Section 3(a)(1) in any + reasonable manner based on the medium, means, and context in + which You Share the Licensed Material. For example, it may be + reasonable to satisfy the conditions by providing a URI or + hyperlink to a resource that includes the required + information. + + 3. If requested by the Licensor, You must remove any of the + information required by Section 3(a)(1)(A) to the extent + reasonably practicable. + + b. ShareAlike. + + In addition to the conditions in Section 3(a), if You Share + Adapted Material You produce, the following conditions also apply. + + 1. The Adapter's License You apply must be a Creative Commons + license with the same License Elements, this version or + later, or a BY-NC-SA Compatible License. + + 2. You must include the text of, or the URI or hyperlink to, the + Adapter's License You apply. You may satisfy this condition + in any reasonable manner based on the medium, means, and + context in which You Share Adapted Material. + + 3. You may not offer or impose any additional or different terms + or conditions on, or apply any Effective Technological + Measures to, Adapted Material that restrict exercise of the + rights granted under the Adapter's License You apply. + + +Section 4 -- Sui Generis Database Rights. + +Where the Licensed Rights include Sui Generis Database Rights that +apply to Your use of the Licensed Material: + + a. for the avoidance of doubt, Section 2(a)(1) grants You the right + to extract, reuse, reproduce, and Share all or a substantial + portion of the contents of the database for NonCommercial purposes + only; + + b. if You include all or a substantial portion of the database + contents in a database in which You have Sui Generis Database + Rights, then the database in which You have Sui Generis Database + Rights (but not its individual contents) is Adapted Material, + including for purposes of Section 3(b); and + + c. You must comply with the conditions in Section 3(a) if You Share + all or a substantial portion of the contents of the database. + +For the avoidance of doubt, this Section 4 supplements and does not +replace Your obligations under this Public License where the Licensed +Rights include other Copyright and Similar Rights. + + +Section 5 -- Disclaimer of Warranties and Limitation of Liability. + + a. UNLESS OTHERWISE SEPARATELY UNDERTAKEN BY THE LICENSOR, TO THE + EXTENT POSSIBLE, THE LICENSOR OFFERS THE LICENSED MATERIAL AS-IS + AND AS-AVAILABLE, AND MAKES NO REPRESENTATIONS OR WARRANTIES OF + ANY KIND CONCERNING THE LICENSED MATERIAL, WHETHER EXPRESS, + IMPLIED, STATUTORY, OR OTHER. THIS INCLUDES, WITHOUT LIMITATION, + WARRANTIES OF TITLE, MERCHANTABILITY, FITNESS FOR A PARTICULAR + PURPOSE, NON-INFRINGEMENT, ABSENCE OF LATENT OR OTHER DEFECTS, + ACCURACY, OR THE PRESENCE OR ABSENCE OF ERRORS, WHETHER OR NOT + KNOWN OR DISCOVERABLE. WHERE DISCLAIMERS OF WARRANTIES ARE NOT + ALLOWED IN FULL OR IN PART, THIS DISCLAIMER MAY NOT APPLY TO YOU. + + b. TO THE EXTENT POSSIBLE, IN NO EVENT WILL THE LICENSOR BE LIABLE + TO YOU ON ANY LEGAL THEORY (INCLUDING, WITHOUT LIMITATION, + NEGLIGENCE) OR OTHERWISE FOR ANY DIRECT, SPECIAL, INDIRECT, + INCIDENTAL, CONSEQUENTIAL, PUNITIVE, EXEMPLARY, OR OTHER LOSSES, + COSTS, EXPENSES, OR DAMAGES ARISING OUT OF THIS PUBLIC LICENSE OR + USE OF THE LICENSED MATERIAL, EVEN IF THE LICENSOR HAS BEEN + ADVISED OF THE POSSIBILITY OF SUCH LOSSES, COSTS, EXPENSES, OR + DAMAGES. WHERE A LIMITATION OF LIABILITY IS NOT ALLOWED IN FULL OR + IN PART, THIS LIMITATION MAY NOT APPLY TO YOU. + + c. The disclaimer of warranties and limitation of liability provided + above shall be interpreted in a manner that, to the extent + possible, most closely approximates an absolute disclaimer and + waiver of all liability. + + +Section 6 -- Term and Termination. + + a. This Public License applies for the term of the Copyright and + Similar Rights licensed here. However, if You fail to comply with + this Public License, then Your rights under this Public License + terminate automatically. + + b. Where Your right to use the Licensed Material has terminated under + Section 6(a), it reinstates: + + 1. automatically as of the date the violation is cured, provided + it is cured within 30 days of Your discovery of the + violation; or + + 2. upon express reinstatement by the Licensor. + + For the avoidance of doubt, this Section 6(b) does not affect any + right the Licensor may have to seek remedies for Your violations + of this Public License. + + c. For the avoidance of doubt, the Licensor may also offer the + Licensed Material under separate terms or conditions or stop + distributing the Licensed Material at any time; however, doing so + will not terminate this Public License. + + d. Sections 1, 5, 6, 7, and 8 survive termination of this Public + License. + + +Section 7 -- Other Terms and Conditions. + + a. The Licensor shall not be bound by any additional or different + terms or conditions communicated by You unless expressly agreed. + + b. Any arrangements, understandings, or agreements regarding the + Licensed Material not stated herein are separate from and + independent of the terms and conditions of this Public License. + + +Section 8 -- Interpretation. + + a. For the avoidance of doubt, this Public License does not, and + shall not be interpreted to, reduce, limit, restrict, or impose + conditions on any use of the Licensed Material that could lawfully + be made without permission under this Public License. + + b. To the extent possible, if any provision of this Public License is + deemed unenforceable, it shall be automatically reformed to the + minimum extent necessary to make it enforceable. If the provision + cannot be reformed, it shall be severed from this Public License + without affecting the enforceability of the remaining terms and + conditions. + + c. No term or condition of this Public License will be waived and no + failure to comply consented to unless expressly agreed to by the + Licensor. + + d. Nothing in this Public License constitutes or may be interpreted + as a limitation upon, or waiver of, any privileges and immunities + that apply to the Licensor or You, including from the legal + processes of any jurisdiction or authority. + +======================================================================= + +Creative Commons is not a party to its public +licenses. Notwithstanding, Creative Commons may elect to apply one of +its public licenses to material it publishes and in those instances +will be considered the “Licensor.” The text of the Creative Commons +public licenses is dedicated to the public domain under the CC0 Public +Domain Dedication. Except for the limited purpose of indicating that +material is shared under a Creative Commons public license or as +otherwise permitted by the Creative Commons policies published at +creativecommons.org/policies, Creative Commons does not authorize the +use of the trademark "Creative Commons" or any other trademark or logo +of Creative Commons without its prior written consent including, +without limitation, in connection with any unauthorized modifications +to any of its public licenses or any other arrangements, +understandings, or agreements concerning use of licensed material. For +the avoidance of doubt, this paragraph does not form part of the +public licenses. + +Creative Commons may be contacted at creativecommons.org. diff --git a/README.md b/README.md new file mode 100644 index 0000000..e3c1e5a --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +# blog.kennyballou.com # + +Blog content of kennyballou.com powered by [Hugo][1]. + +## License ## + +The content of this repository and blog are released under the terms and +conditions of the Create Commons [CC-BY-NC-SA-4.0][2] license. For more details +read the [license online][2] or see the attached LICENSE file. + +[1]: http://gohugo.io + +[2]: https://creativecommons.org/licenses/by-nc-sa/4.0/ diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..8c717cf --- /dev/null +++ b/config.yaml @@ -0,0 +1,14 @@ +--- +baseurl: "https://kennyballou.com" +MetaDataFormat: "yaml" +languageCode: "en-us" +title: "~kballou" +indexes: + tag: "tags" + topic: "topics" +permalinks: + blog: /blog/:year/:month/:slug +copyright: "Creative Commons Attribution" +author: + name: "Kenny Ballou" +... diff --git a/content/about-me.markdown b/content/about-me.markdown new file mode 100644 index 0000000..b0fcb87 --- /dev/null +++ b/content/about-me.markdown @@ -0,0 +1,20 @@ +--- +title: "about me" +keywords: [] +tags: [] +pubdate: "2014-10-10" +date: "2014-10-10" +topics: [] +slug: about-me +--- + +# About Me # + +I'm a born hacker and life-long learner. I enjoy thinking through problems, +hacking together systems, and learning anything and everything. + +Sometimes I `:() { :|:& };:`. + +Other times I `import this`. + +The one I choose today: `fork: retry: Resource temporarily unavailable`. diff --git a/content/blog/Spark.markdown b/content/blog/Spark.markdown new file mode 100644 index 0000000..1b2699e --- /dev/null +++ b/content/blog/Spark.markdown @@ -0,0 +1,739 @@ +--- +title: "Real-Time Streaming with Apache Spark Streaming" +description: "Overview of Apache Spark and a sample Twitter Sentiment Analysis" +tags: + - "Apache Spark" + - "Apache Kafka" + - "Apache" + - "Java" + - "Sentiment Analysis" + - "Real-time Streaming" + - "ZData Inc." +date: "2014-08-18" +catagories: + - "Apache" + - "Development" + - "Real-time Systems" +slug: "real-time-streaming-apache-spark-streaming" +--- + +This is the second post in a series on real-time systems tangential to the +Hadoop ecosystem. [Last time][6], we talked about [Apache Kafka][5] and Apache +Storm for use in a real-time processing engine. Today, we will be exploring +Apache Spark (Streaming) as part of a real-time processing engine. + +## About Spark ## + +[Apache Spark][1] is a general purpose, large-scale processing engine, recently +fully inducted as an Apache project and is currently under very active +development. As of this writing, Spark is at version 1.0.2 and 1.1 will be +released some time soon. + +Spark is intended to be a drop in replacement for Hadoop MapReduce providing +the benefit of improved performance. Combining Spark with its related projects +and libraries -- [Spark SQL (formerly Shark)][14], [Spark Streaming][15], +[Spark MLlib][16], [GraphX][17], among others -- and a very capable and +promising processing stack emerges. Spark is capable of reading from HBase, +Hive, Cassandra, and any HDFS data source. Not to mention the many external +libraries that enable consuming data from many more sources, e.g., hooking +Apache Kafka into Spark Streaming is trivial. Further, the Spark Streaming +project provides the ability to continuously compute transformations on data. + +### Resilient Distributed Datasets ### + +Apache Spark's primitive type is the Resilient Distributed Dataset (RDD). All +transformations, `map`, `join`, `reduce`, etc., in Spark revolve around this +type. RDD's can be created in one of three ways: _parallelizing_ (distributing +a local dataset); reading a stable, external data source, such as an HDFS file; +or transformations on existing RDD's. + +In Java, parallelizing may look like: + + List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); + JavaRDD<Integer> distData = sc.parallelize(data); + +Where `sc` defines the Spark context. + +Similarly, reading a file from HDFS may look like: + + JavaRDD<String> distFile = sc.textFile("hdfs:///data.txt"); + +The resiliency of RDD's comes from their [lazy][34] materialization and the +information required to enable this lazy nature. RDD's are not always fully +materialized but they _do_ contain enough information (their linage) to be +(re)created from a stable source [[Zaharia et al.][32]]. + +RDD's are distributed among the participating machines, and RDD transformations +are coarse-grained -- the same transformation will be applied to _every_ +element in an RDD. The number of partitions in an RDD is generally defined by +the locality of the stable source, however, the user may control this number +via repartitioning. + +Another important property to mention, RDD's are actually immutable. This +immutability can be illustrated with [Spark's][27] Word Count example: + + JavaRDD<String> file = sc.textFile("hdfs:///data.txt"); + JavaRDD<String> words = file.flatMap( + new FlatMapFunction<String, String>() { + public Iterable<String> call(String line) { + return Arrays.asList(line.split(" ")); + } + } + ); + JavaPairRDD<String, Integer> pairs = words.map( + new PairFunction<String, String, Integer>() { + public Tuple2<String, Integer> call(String word) { + return new Tuple2<String, Integer>(word, 1); + } + } + ); + JavaPairRDD<String, Integer> counts = pairs.reduceByKey( + new Function2<Integer, Integer>() { + public Integer call(Integer a, Integer b) { return a + b; } + } + ); + counts.saveAsTextFile("hdfs:///data_counted.txt"); + +This is the canonical word count example, but here is a brief explanation: load +a file into an RDD, split the words into a new RDD, map the words into pairs +where each word is given a count (one), then reduce the counts of each word by +a key, in this case the word itself. Notice, each operation, `map`, `flatMap`, +`reduceByKey`, creates a _new_ RDD. + +To bring all these properties together, Resilient Distributed Datasets are +read-only, lazy distributed sets of elements that can have a chain of +transformations applied to them. They facilitate resiliency by storing lineage +graphs of the transformations (to be) applied and they [parallelize][35] the +computations by partitioning the data among the participating machines. + +### Discretized Streams ### + +Moving to Spark Streaming, the primitive is still RDD's. However, there is +another type for encapsulating a continuous stream of data: Discretized Streams +or DStreams. DStreams are defined as sequences of RDD's. A DStream is created +from an input source, such as Apache Kafka, or from the transformation of +another DStream. + +Turns out, programming against DStreams is _very_ similar to programming +against RDD's. The same word count code can be slightly modified to create a +streaming word counter: + + JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999); + JavaDStream<String> words = lines.flatMap( + new FlatMapFunction<String, String>() { + public Iterable<String> call(String line) { + return Arrays.asList(line.split(" ")); + } + } + ); + JavaPairDStream<String, Integer> pairs = words.map( + new PairFunction<String, String, Integer>() { + public Tuple2<String, Integer> call(String word) { + return new Tuple2<String, Integer>(word, 1); + } + } + ); + JavaPairDStream<String, Integer> counts = pairs.reduceByKey( + new Function2<Integer, Integer>() { + public Integer call(Integer a, Integer b) { return a + b; } + } + ); + counts.print(); + +Notice, really the only change between first example's code is the return +types. In the streaming context, transformations are working on streams of +RDD's, Spark handles applying the functions (that work against data in the +RDD's) to the RDD's in the current batch/ DStream. + +Though programming against DStreams is similar, there are indeed some +differences as well. Chiefly, DStreams also have _statefull_ transformations. +These include sharing state between batches/ intervals and modifying the +current frame when aggregating over a sliding window. + +>The key idea is to treat streaming as a series of short batch jobs, and bring +>down the latency of these jobs as much as possible. This brings many of the +>benefits of batch processing models to stream processing, including clear +>consistency semantics and a new parallel recovery technique... +[[Zaharia et al.][33]] + +### Hadoop Requirements ### + +Technically speaking, Apache Spark does [_not_][30] require Hadoop to be fully +functional. In a cluster setting, however, a means of sharing files between +tasks will need to be facilitated. This could be accomplished through [S3][8], +[NFS][9], or, more typically, HDFS. + +### Running Spark Applications ### + +Apache Spark applications can run in [standalone mode][18] or be managed by +[YARN][10]([Running Spark on YARN][19]), [Mesos][11]([Running Spark on +Mesos][20]), and even [EC2][28]([Running Spark on EC2][29]). Furthermore, if +running under YARN or Mesos, Spark does not need to be installed to work. That +is, Spark code can execute on YARN and Mesos clusters without change to the +cluster. + +### Language Support ### + +Currently, Apache Spark supports the Scala, Java, and Python programming +languages. Though, this post will only be discussing examples in Java. + +### Initial Thoughts ### + +Getting away from the idea of directed acyclic graphs (DAG's) is -- may be -- +both a bit of a leap and a benefit. Although it is perfectly acceptable to +define Spark's transformations altogether as a DAG, this can feel awkward when +developing Spark applications. Describing the transformations as [Monadic][13] +feels much more natural. Of course, a monad structure fits the DAG analogy +quite well, especially when considered in some of the physical analogies such +as assembly lines. + +Java's, and consequently Spark's, type strictness was an initial hurdle to +get accustomed. But overall, this is good. It means the compiler will catch a +lot of issues with transformations early. + +Depending on Scala's `Tuple[\d]` classes feels second-class, but this is only a +minor tedium. It's too bad current versions of Java don't have good classes for +this common structure. + +YARN and Mesos integration is a very nice benefit as it allows full stack +analytics to not oversubscribe clusters. Furthermore, it gives the ability to +add to existing infrastructure without overloading the developers and the +system administrators with _yet another_ computational suite and/ or resource +manager. + +On the negative side of things, dependency hell can creep into Spark projects. +Your project and Spark (and possibly Spark's dependencies) may depend on a +common artifact. If the versions don't [converge][21], many subtle problems can +emerge. There is an [experimental configuration option][4] to help alleviate +this problem, however, for me, it caused more problems than solved. + +## Test Project: Twitter Stream Sentiment Analysis ## + +To really test Spark (Streaming), a Twitter Sentiment Analysis project was +developed. It's almost a direct port of the [Storm code][3]. Though there is an +external library for hooking Spark directly into Twitter, Kafka is used so a +more precise comparison of Spark and Storm can be made. + +When the processing is finished, the data are written to HDFS and posted to a +simple NodeJS application. + +### Setup ### + +The setup is the same as [last time][6]: 5 node Vagrant virtual cluster with +each node running 64 bit CentOS 6.5, given 1 core, and 1024MB of RAM. Every +node is running HDFS (datanode), YARN worker nodes (nodemanager), ZooKeeper, +and Kafka. The first node, `node0`, is the namenode and resource manager. +`node0` is also running a [Docker][7] container with a NodeJS application for +reporting purposes. + +### Application Overview ### + +This project follows a very similar process structure as the Storm Topology +from last time. + +![Sentiment Analysis Topology][satimg] + +However, each node in the above graph is actually a transformation on the +current DStream and not an individual process (or group of processes). + +This test project similarly uses the same [simple Kafka producer][22] +developed. This Kafka producer will be how data are ingested by the system. + +[A lot of this overview will be a rehashing of last time.] + +#### Kafka Receiver Stream #### + +The data processed is received from a Kafka Stream and is implemented via the +[external Kafka][23] library. This process simply creates a connection to the +Kafka broker(s), consuming messages from the given set of topics. + +##### Stripping Kafka Message IDs ##### + +It turns out the messages from Kafka are retuned as tuples, more specifically +pairs, with the message ID and the message content. Before continuing, the +message ID is stripped and the Twitter JSON data is passed down the pipeline. + +#### Twitter Data JSON Parsing #### + +As was the case last time, the important parts (tweet ID, tweet text, and +language code) need to be extracted from the JSON. Furthermore, this project +only parses English tweets. Non-English tweets are filtered out at this stage. + +#### Filtering and Stemming #### + +Many tweets contain messy or otherwise unnecessary characters and punctuation +that can be safely ignored. Moreover, there may also be many common words that +cannot be reliably scored either positively or negatively. At this stage, these +symbols and _stop words_ should be filtered. + +#### Classifiers #### + +Both the Positive classifier and the Negative classifier are in separate `map` +transformations. The implementation of both follows the [Bag-of-words][25] +model. + +#### Joining and Scoring #### + +Because the classifiers are done separately and a join is contrived, the next +step is to join the classifier scores together and actually declare a winner. +It turns out this is quite trivial to do in Spark. + +#### Reporting: HDFS and HTTP POST #### + +Finally, once the tweets are joined and scored, the scores need to be reported. +This is accomplished by writing the final tuples to HDFS and posting a JSON +object of the tuple to a simple NodeJS application. + +This process turned out to not be as awkward as was the case with Storm. The +`foreachRDD` function of DStreams is a natural way to do side-effect inducing +operations that don't necessarily transform the data. + +### Implementing the Kafka Producer ### + +See the [post][6] from last time for the details of the Kafka producer; this +has not changed. + +### Implementing the Spark Streaming Application ### + +Diving into the code, here are some of the primary aspects of this project. The +full source of this test application can be found on [Github][24]. + +#### Creating Spark Context, Wiring Transformation Chain #### + +The Spark context, the data source, and the transformations need to be defined. +Proceeding, the context needs to be started. This is all accomplished with the +following code: + + SparkConf conf = new SparkConf() + .setAppName("Twitter Sentiment Analysis"); + + if (args.length > 0) + conf.setMaster(args[0]); + else + conf.setMaster("local[2]"); + + JavaStreamingContext ssc = new JavaStreamingContext( + conf, + new Duration(2000)); + + Map<String, Integer> topicMap = new HashMap<String, Integer>(); + topicMap.put(KAFKA_TOPIC, KAFKA_PARALLELIZATION); + + JavaPairReceiverInputDStream<String, String> messages = + KafkaUtils.createStream( + ssc, + Properties.getString("rts.spark.zkhosts"), + "twitter.sentimentanalysis.kafka", + topicMap); + + JavaDStream<String> json = messages.map( + new Function<Tuple2<String, String>, String>() { + public String call(Tuple2<String, String> message) { + return message._2(); + } + } + ); + + JavaPairDStream<Long, String> tweets = json.mapToPair( + new TwitterFilterFunction()); + + JavaPairDStream<Long, String> filtered = tweets.filter( + new Function<Tuple2<Long, String>, Boolean>() { + public Boolean call(Tuple2<Long, String> tweet) { + return tweet != null; + } + } + ); + + JavaDStream<Tuple2<Long, String>> tweetsFiltered = filtered.map( + new TextFilterFunction()); + + tweetsFiltered = tweetsFiltered.map( + new StemmingFunction()); + + JavaPairDStream<Tuple2<Long, String>, Float> positiveTweets = + tweetsFiltered.mapToPair(new PositiveScoreFunction()); + + JavaPairDStream<Tuple2<Long, String>, Float> negativeTweets = + tweetsFiltered.mapToPair(new NegativeScoreFunction()); + + JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined = + positiveTweets.join(negativeTweets); + + JavaDStream<Tuple4<Long, String, Float, Float>> scoredTweets = + joined.map(new Function<Tuple2<Tuple2<Long, String>, + Tuple2<Float, Float>>, + Tuple4<Long, String, Float, Float>>() { + public Tuple4<Long, String, Float, Float> call( + Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet) + { + return new Tuple4<Long, String, Float, Float>( + tweet._1()._1(), + tweet._1()._2(), + tweet._2()._1(), + tweet._2()._2()); + } + }); + + JavaDStream<Tuple5<Long, String, Float, Float, String>> result = + scoredTweets.map(new ScoreTweetsFunction()); + + result.foreachRDD(new FileWriter()); + result.foreachRDD(new HTTPNotifierFunction()); + + ssc.start(); + ssc.awaitTermination(); + +Some of the more trivial transforms are defined in-line. The others are defined +in their respective files. + +#### Twitter Data Filter / Parser #### + +Parsing Twitter JSON data is one of the first transformations and is +accomplished with help of the [JacksonXML Databind][26] library. + + JsonNode root = mapper.readValue(tweet, JsonNode.class); + long id; + String text; + if (root.get("lang") != null && + "en".equals(root.get("lang").textValue())) + { + if (root.get("id") != null && root.get("text") != null) + { + id = root.get("id").longValue(); + text = root.get("text").textValue(); + return new Tuple2<Long, String>(id, text); + } + return null; + } + return null; + +The `mapper` (`ObjectMapper`) object is defined at the class level so it is not +recreated _for each_ RDD in the DStream, a minor optimization. + +You may recall, this is essentially the same code as [last time][6]. The only +difference really is that the tuple is returned instead of being emitted. +Because certain situations (e.g., non-English tweet, malformed tweet) return +null, the nulls will need to be filtered out. Thankfully, Spark provides a +simple way to accomplish this: + + JavaPairDStream<Long, String> filtered = tweets.filter( + new Function<Tuple2<Long, String>, Boolean>() { + public Boolean call(Tuple2<Long, String> tweet) { + return tweet != null; + } + } + ); + +#### Text Filtering #### + +As mentioned before, punctuation and other symbols are simply discarded as they +provide little to no benefit to the classifiers: + + String text = tweet._2(); + text = text.replaceAll("[^a-zA-Z\\s]", "").trim().toLowerCase(); + return new Tuple2<Long, String>(tweet._1(), text); + +Similarly, common words should be discarded as well: + + String text = tweet._2(); + List<String> stopWords = StopWords.getWords(); + for (String word : stopWords) + { + text = text.replaceAll("\\b" + word + "\\b", ""); + } + return new Tuple2<Long, String>(tweet._1(), text); + +#### Positive and Negative Scoring #### + +Each classifier is defined in its own class. Both classifiers are _very_ +similar in definition. + +The positive classifier is primarily defined by: + + String text = tweet._2(); + Set<String> posWords = PositiveWords.getWords(); + String[] words = text.split(" "); + int numWords = words.length; + int numPosWords = 0; + for (String word : words) + { + if (posWords.contains(word)) + numPosWords++; + } + return new Tuple2<Tuple2<Long, String>, Float>( + new Tuple2<Long, String>(tweet._1(), tweet._2()), + (float) numPosWords / numWords + ); + +And the negative classifier: + + String text = tweet._2(); + Set<String> negWords = NegativeWords.getWords(); + String[] words = text.split(" "); + int numWords = words.length; + int numPosWords = 0; + for (String word : words) + { + if (negWords.contains(word)) + numPosWords++; + } + return new Tuple2<Tuple2<Long, String>, Float>( + new Tuple2<Long, String>(tweet._1(), tweet._2()), + (float) numPosWords / numWords + ); + +Because both are implementing a `PairFunction`, a join situation is contrived. +However, this could _easily_ be defined differently such that one classifier is +computed, then the next, without ever needing to join the two together. + +#### Joining #### + +It turns out, joining in Spark is very easy to accomplish. So easy in fact, it +can be handled without virtually _any_ code: + + JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined = + positiveTweets.join(negativeTweets); + +But because working with a Tuple of nested tuples seems unwieldy, transform it +to a 4 element tuple: + + public Tuple4<Long, String, Float, Float> call( + Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet) + { + return new Tuple4<Long, String, Float, Float>( + tweet._1()._1(), + tweet._1()._2(), + tweet._2()._1(), + tweet._2()._2()); + } + +#### Scoring: Declaring Winning Class #### + +Declaring the winning class is a matter of a simple map, comparing each class's +score and take the greatest: + + String score; + if (tweet._3() >= tweet._4()) + score = "positive"; + else + score = "negative"; + return new Tuple5<Long, String, Float, Float, String>( + tweet._1(), + tweet._2(), + tweet._3(), + tweet._4(), + score); + +This declarer is more optimistic about the neutral case but is otherwise very +straightforward. + +#### Reporting the Results #### + +Finally, the pipeline completes with writing the results to HDFS: + + if (rdd.count() <= 0) return null; + String path = Properties.getString("rts.spark.hdfs_output_file") + + "_" + + time.milliseconds(); + rdd.saveAsTextFile(path); + +And sending POST request to a NodeJS application: + + rdd.foreach(new SendPostFunction()); + +Where `SendPostFunction` is primarily given by: + + String webserver = Properties.getString("rts.spark.webserv"); + HttpClient client = new DefaultHttpClient(); + HttpPost post = new HttpPost(webserver); + String content = String.format( + "{\"id\": \"%d\", " + + "\"text\": \"%s\", " + + "\"pos\": \"%f\", " + + "\"neg\": \"%f\", " + + "\"score\": \"%s\" }", + tweet._1(), + tweet._2(), + tweet._3(), + tweet._4(), + tweet._5()); + + try + { + post.setEntity(new StringEntity(content)); + HttpResponse response = client.execute(post); + org.apache.http.util.EntityUtils.consume(response.getEntity()); + } + catch (Exception ex) + { + Logger LOG = Logger.getLogger(this.getClass()); + LOG.error("exception thrown while attempting to post", ex); + LOG.trace(null, ex); + } + +Each file written to HDFS _will_ have data in it, but the data written will be +small. A better batching procedure should be implemented so the files written +match the HDFS block size. + +Similarly, a POST request is opened _for each_ scored tweet. This can be +expensive on both the Spark Streaming batch timings and the web server +receiving the requests. Batching here could similarly improve overall +performance of the system. + +That said, writing these side-effects this way fits very naturally into the +Spark programming style. + +## Summary ## + +Apache Spark, in combination with Apache Kafka, has some amazing potential. And +not only in the Streaming context, but as a drop-in replacement for +traditional Hadoop MapReduce. This combination makes it a very good candidate +for a part in an analytics engine. + +Stay tuned, as the next post will be a more in-depth comparison between Apache +Spark and Apache Storm. + +## Related Links / References ## + +[satimg]: https://localhost/media/SentimentAnalysisTopology.png + +[1]: http://spark.apache.org/ + +* [Apache Spark][1] + +[2]: http://inside-bigdata.com/2014/07/15/theres-spark-theres-fire-state-apache-spark-2014/ + +* [State of Apache Spark 2014][2] + +[3]: https://github.com/zdata-inc/StormSampleProject + +* [Storm Sample Project][3] + +[4]: https://issues.apache.org/jira/browse/SPARK-939 + +* [SPARK-939][4] + +[5]: http://kafka.apache.org + +* [Apache Spark][5] + +[6]: http://www.zdatainc.com/2014/07/real-time-streaming-apache-storm-apache-kafka/ + +* [Real-Time Streaming with Apache Storm and Apache Kafka][6] + +[7]: http://www.docker.io/ + +* [Docker IO Project Page][7] + +[8]: http://aws.amazon.com/s3/ + +* [Amazon S3][8] + +[9]: http://en.wikipedia.org/wiki/Network_File_System + +* [Network File System (NFS)][9] + +[10]: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html + +* [Hadoop YARN][10] + +[11]: http://mesos.apache.org + +* [Apache Mesos][11] + +[12]: http://spark.apache.org/docs/latest/streaming-programming-guide.html + +* [Spark Streaming Programming Guide][12] + +[13]: http://en.wikipedia.org/wiki/Monad_(functional_programming) + +* [Monad][13] + +[14]: http://spark.apache.org/sql/ + +* [Spark SQL][14] + +[15]: http://spark.apache.org/streaming/ + +* [Spark Streaming][15] + +[16]: http://spark.apache.org/mllib/ + +* [MLlib][16] + +[17]: http://spark.apache.org/graphx/ + +* [GraphX][17] + +[18]: http://spark.apache.org/docs/latest/spark-standalone.html + +* [Spark Standalone Mode][18] + +[19]: http://spark.apache.org/docs/latest/running-on-yarn.html + +* [Running on YARN][19] + +[20]: http://spark.apache.org/docs/latest/running-on-mesos.html + +* [Running on Mesos][20] + +[21]: http://cupofjava.de/blog/2013/02/01/fight-dependency-hell-in-maven/ + +* [Fight Dependency Hell in Maven][21] + +[22]: https://github.com/zdata-inc/SimpleKafkaProducer + +* [Simple Kafka Producer][22] + +[23]: https://github.com/apache/spark/tree/master/external/kafka + +* [Spark: External Kafka Library][23] + +[24]: https://github.com/zdata-inc/SparkSampleProject + +* [Spark Sample Project][24] + +[25]: http://en.wikipedia.org/wiki/Bag-of-words_model + +* [Wikipedia: Bag-of-words][25] + +[26]: https://github.com/FasterXML/jackson-databind + +* [Jackson XML Databind Project][26] + +[27]: http://spark.apache.org/docs/latest/programming-guide.html + +* [Spark Programming Guide][27] + +[28]: http://aws.amazon.com/ec2/ + +* [Amazon EC2][28] + +[29]: http://spark.apache.org/docs/latest/ec2-scripts.html + +* [Running Spark on EC2][29] + +[30]: http://spark.apache.org/faq.html + +* [Spark FAQ][30] + +[31]: http://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html + +* [Future of Shark][31] + +[32]: https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf + +* [Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing (PDF)][32] + +[33]: https://www.usenix.org/system/files/conference/hotcloud12/hotcloud12-final28.pdf + +* [Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters (PDF)][33] + +[34]: http://en.wikipedia.org/wiki/Lazy_evaluation + +* [Wikipedia: Lazy evaluation][34] + +[35]: http://en.wikipedia.org/wiki/Data_parallelism + +* [Wikipedia: Data Parallelism][35] diff --git a/content/blog/Storm-vs-Spark.markdown b/content/blog/Storm-vs-Spark.markdown new file mode 100644 index 0000000..887b60c --- /dev/null +++ b/content/blog/Storm-vs-Spark.markdown @@ -0,0 +1,457 @@ +--- +title: "Apache Storm and Apache Spark Streaming" +description: "Comparison of Apache Storm and Apache Spark Streaming" +tags: + - "Apache Storm" + - "Apache Spark" + - "Apache" + - "Real-time Streaming" + - "ZData Inc." +date: "2014-09-08" +categories: + - "Apache" + - "Development" + - "Real-time Systems" +slug: "apache-storm-and-apache-spark" +--- + +This is the last post in the series on real-time systems. In the [first +post][3] we discussed [Apache Storm][1] and [Apache Kafka][5]. In the [second +post][4] we discussed [Apache Spark (Streaming)][3]. In both posts we examined +a small Twitter Sentiment Analysis program. Today, we will be reviewing both +systems: how they compare and how they contrast. + +The intention is not to cast judgment over one project or the other, but rather +to exposit the differences and similarities. Any judgments made, subtle or not, +are mistakes in exposition and/ or organization and are not actual endorsements +of either project. + +## Apache Storm ## + +"Storm is a distributed real-time computation system" [[1][1]]. Apache Storm is +a [task parallel][7] continuous computational engine. It defines its workflows +in Directed Acyclic Graphs (DAG's) called "topologies". These topologies run +until shutdown by the user or encountering an unrecoverable failure. + +Storm does not natively run on top of typical Hadoop clusters, it uses +[Apache ZooKeeper][8] and its own master/ minion worker processes to +coordinate topologies, master and worker state, and the message guarantee +semantics. That said, both [Yahoo!][9] and [Hortonworks][10] are working on +providing libraries for running Storm topologies on top of Hadoop 2.x YARN +clusters. Furthermore, Storm can run on top of the [Mesos][11] scheduler as +well, [natively][12] and with help from the [Marathon][13] framework. + +Regardless though, Storm can certainly still consume files from HDFS and/ or +write files to HDFS. + +## Apache Spark (Streaming) ## + +"Apache Spark is a fast and general purpose engine for large-scale data +processing" [[2][2]]. [Apache Spark][2] is a [data parallel][8] general purpose +batch processing engine. Workflows are defined in a similar and reminiscent +style of MapReduce, however, is much more capable than traditional Hadoop +MapReduce. Apache Spark has its Streaming API project that allows for +continuous processing via short interval batches. Similar to Storm, Spark +Streaming jobs run until shutdown by the user or encounter an unrecoverable +failure. + +Apache Spark does not itself require Hadoop to operate. However, its data +parallel paradigm requires a shared filesystem for optimal use of stable data. +The stable source can range from [S3][14], [NFS][15], or, more typically, +[HDFS][16]. + +Executing Spark applications does not _require_ Hadoop YARN. Spark has its own +standalone master/ server processes. However, it is common to run Spark +applications using YARN containers. Furthermore, Spark can also run on Mesos +clusters. + +## Development ## + +As of this writing, Apache Spark is a full, top level Apache project. Whereas +Apache Storm is currently undergoing incubation. Moreover, the latest stable +version of Apache Storm is `0.9.2` and the latest stable version of Apache +Spark is `1.0.2` (with `1.1.0` to be released in the coming weeks). Of course, +as the Apache Incubation reminder states, this does not strictly reflect +stability or completeness of either project. It is, however, a reflection to +the state of the communities. Apache Spark operations and its process are +endorsed by the [Apache Software Foundation][27]. Apache Storm is working on +stabilizing its community and development process. + +Spark's `1.x` version does state that the API has stabilized and will not be +doing major changes undermining backward compatibility. Implicitly, Storm has +no guaranteed stability in its API, however, it is [running in production for +many different companies][34]. + +### Implementation Language ### + +Both Apache Spark and Apache Storm are implemented in JVM based languages: +[Scala][19] and [Clojure][20], respectively. + +Scala is a functional meets object-oriented language. In other words, the +language carries ideas from both the functional world and the object-oriented +world. This yields an interesting mix of code reusability, extensibility, and +higher-order functions. + +Clojure is a dialect of [Lisp][21] targeting the JVM providing the Lisp +philosophy: code-as-data and providing the rich macro system typical of Lisp +languages. Clojure is predominately functional in nature, however, if state or +side-effects are required, they are facilitated with a transactional memory +model, aiding in making multi-threaded based applications consistent and safe. + +#### Message Passing Layer #### + +Until version `0.9.x`, Storm was using the Java library [JZMQ][22] for +[ZeroMQ][23] messages. However, Storm has since moved the default messaging +layer to [Netty][24] with efforts from [Yahoo!][25]. Although Netty is now +being used by default, users can still use ZeroMQ, if desired, since the +migration to Netty was intended to also make the message layer pluggable. + +Spark, on the other hand, uses a combination of [Netty][24] and [Akka][26] for +distributing messages throughout the executors. + +### Commit Velocity ### + +As a reminder, these data are included not to cast judgment on one project or +the other, but rather to exposit the fluidness of each project. The continuum +of the dynamics of both projects can be used as an argument for or against, +depending on application requirements. If rigid stability is a strong +requirement, arguing for a slower commit velocity may be appropriate. + +Source of the following statistics were taken from the graphs at +[GitHub](https://github.com/) and computed from [this script][38]. + +#### Spark Commit Velocity #### + +Examining the graphs from +[GitHub](https://github.com/apache/spark/graphs/commit-activity), over the last +month (as of this writing), there have been over 330 commits. The previous +month had about 340. + +#### Storm Commit Velocity #### + +Again examining the commit graphs from +[GitHub](https://github.com/apache/incubator-storm/graphs/commit-activity), +over the last month (as of this writing), there have been over 70 commits. The +month prior had over 130. + +### Issue Velocity ### + +Sourcing the summary charts from JIRA, we can see that clearly Spark has a huge +volume of issues reported and closed in the last 30 days. Storm, roughly, an +order of magnitude less. + +Spark Open and Closed JIRA Issues (last 30 days): + +[![Spark JIRA Issues][spark_jira_issues]][18] + +Storm Open and Closed JIRA Issues (last 30 days): + +[![Storm JIRA Issues][storm_jira_issues]][17] + +### Contributor/ Community Size ### + +#### Storm Contributor Size #### + +Sourcing the reports from +[GitHub](https://github.com/apache/incubator-storm/graphs/contributors), Storm +has over a 100 contributors. This number, though, is just the unique number of +people who have committed at least one patch. + +Over the last 60 days, Storm has seen 34 unique contributors and 16 over the +last 30. + +#### Spark Contributor Size #### + +Similarly sourcing the reports from [GitHub](https://github.com/apache/spark), +Spark has roughly 280 contributors. A similar note as before must be made about +this number: this is the number of at least one patch contributors to the +project. + +Apache Spark has had over 140 contributors over the last 60 days and 94 over +the last 30 days. + +## Development Friendliness ## + +### Developing for Storm ### + +* Describing the process structure with DAG's feels natural to the + [processing model][7]. Each node in the graph will transform the data in a + certain way, and the process continues, possibly disjointly. + +* Storm tuples, the data passed between nodes in the DAG, have a very natural + interface. However, this comes at a cost to compile-time type safety. + +### Developing for Spark ### + +* Spark's monadic expression of transformations over the data similarly feels + natural in this [processing model][6]; this falls in line with the idea + that RDD's are lazy and maintain transformation lineages, rather than + actuallized results. + +* Spark's use of Scala Tuples can feel awkward in Java, and this awkwardness + is only exacerbated with the nesting of generic types. However, this + awkwardness does come with the benefit of compile-time type checks. + + - Furthermore, until Java 1.8, anonymous functions are inherently + awkward. + + - This is probably a non-issue if using Scala. + +## Installation / Administration ## + +Installation of both Apache Spark and Apache Storm are relatively straight +forward. Spark may be simpler in some regards, however, since it technically +does not _need_ to be installed to function on YARN or Mesos clusters. The +Spark application will just require the Spark assembly be present in the +`CLASSPATH`. + +Storm, on the other hand, requires ZooKeeper to be properly installed and +running on top of the regular Storm binaries that must be installed. +Furthermore, like ZooKeeper, Storm should run under [supervision][35]; +installation of a supervisor service, e.g., [supervisord][28], is recommended. + +With respect to installation, supporting projects like Apache Kafka are out of +scope and have no impact on the installation of either Storm or Spark. + +## Processing Models ## + +Comparing Apache Storm and Apache Spark's Streaming, turns out to be a bit +challenging. One is a true stream processing engine that can do micro-batching, +the other is a batch processing engine which micro-batches, but cannot perform +streaming in the strictest sense. Furthermore, the comparison between streaming +and batching isn't exactly a subtle difference, these are fundamentally +different computing ideas. + +### Batch Processing ### + +[Batch processing][31] is the familiar concept of processing data en masse. The +batch size could be small or very large. This is the processing model of the +core Spark library. + +Batch processing excels at processing _large_ amounts of stable, existing data. +However, it generally incurs a high-latency and is completely unsuitable for +incoming data. + +### Event-Stream Processing ### + +[Stream processing][32] is a _one-at-a-time_ processing model; a datum is +processed as it arrives. The core Storm library follows this processing model. + +Stream processing excels at computing transformations as data are ingested with +sub-second latencies. However, with stream processing, it is incredibly +difficult to process stable data efficiently. + +### Micro-Batching ### + +Micro-batching is a special case of batch processing where the batch size is +orders smaller. Spark Streaming operates in this manner as does the Storm +[Trident API][33]. + +Micro-batching seems to be a nice mix between batching and streaming. However, +micro-batching incurs a cost of latency. If sub-second latency is paramount, +micro-batching will typically not suffice. On the other hand, micro-batching +trivially gives stateful computation, making [windowing][37] an easy task. + +## Fault-Tolerance / Message Guarantees ## + +As a result of each project's fundamentally different processing models, the +fault-tolerance and message guarantees are handled differently. + +### Delivery Semantics ### + +Before diving into each project's fault-tolerance and message guarantees, here +are the common delivery semantics: + +* At most once: messages may be lost but never redelivered. + +* At least once: messages will never be lost but may be redelivered. + +* Exactly once: messages are never lost and never redelivered, perfect + message delivery. + +### Apache Storm ### + +To provide fault-tolerant messaging, Storm has to keep track of each and every +record. By default, this is done with at least once delivery semantics. +Storm can be configured to provide at most once and exactly once. The delivery +semantics offered by Storm can incur latency costs; if data loss in the stream +is acceptable, at most once delivery will improve performance. + +### Apache Spark Streaming ### + +The resiliency built into Spark RDD's and the micro-batching yields a trivial +mechanism for providing fault-tolerance and message delivery guarantees. That +is, since Spark Streaming is just small-scale batching, exactly once delivery +is a trivial result for each batch; this is the _only_ delivery semantic +available to Spark. However some failure scenarios of Spark Streaming degrade +to at least once delivery. + +## Applicability ## + +### Apache Storm ### + +Some areas where Storm excels include: near real-time analytics, natural +language processing, data normalization and [ETL][36] transformations. It also +stands apart from traditional MapReduce and other course-grained technologies +yielding fine-grained transformations allowing very flexible processing +topologies. + +### Apache Spark Streaming ### + +Spark has an excellent model for performing iterative machine learning and +interactive analytics. But Spark also excels in some similar areas of Storm +including near real-time analytics, ingestion. + +## Final Thoughts ## + +Generally, the requirements will dictate the choice. However, here are some +major points to consider when choosing the right tool: + +* Latency: Is the performance of the streaming application paramount? Storm + can give sub-second latency much more easily and with less restrictions + than Spark Streaming. + +* Development Cost: Is it desired to have similar code bases for batch + processing _and_ stream processing? With Spark, batching and streaming are + _very_ similar. Storm, however, departs dramatically from the MapReduce + paradigm. + +* Message Delivery Guarantees: Is there high importance on processing _every_ + single record, or is some nominal amount of data loss acceptable? + Disregarding everything else, Spark trivially yields perfect, exactly once + message delivery. Storm can provide all three delivery semantics, but + getting perfect exactly once message delivery requires more effort to + properyly achieve. + +* Process Fault Tolerance: Is high-availability of primary concern? Both + systems actually handle fault-tolerance of this kind really well and in + relatively similar ways. + + - Production Storm clusters will run Storm processes under + [supervision][35]; if a process fails, the supervisor process will + restart it automatically. State management is handled through + ZooKeeper. Processes restarting will reread the state from ZooKeeper on + an attempt to rejoin the cluster. + + - Spark handles restarting workers via the resource manager: YARN, Mesos, + or its standalone manager. Spark's standalone resource manager handles + master node failure with standby-masters and ZooKeeper. Or, this can be + handled more primatively with just local filesystem state + checkpointing, not typically recommended for production environments. + +Both Apache Spark Streaming and Apache Storm are great solutions that solve the +streaming ingestion and transformation problem. Either system can be a great +choice for part of an analytics stack. Choosing the right one is simply a +matter of answering the above questions. + +## References ## + +[spark_jira_issues]: https://localhost/media/spark_issues_chart.png + +[storm_jira_issues]: https://localhost/media/storm_issues_chart.png + +[1]: http://storm.incubator.apache.org/documentation/Home.html + +* [Apache Storm Home Page][1] + +[2]: http://spark.apache.org + +* [Apache Spark][2] + +[3]: http://www.zdatainc.com/2014/07/real-time-streaming-apache-storm-apache-kafka/ + +* [Real Time Streaming with Apache Storm and Apache Kafka][3] + +[4]: http://www.zdatainc.com/2014/08/real-time-streaming-apache-spark-streaming/ + +* [Real Time Streaming with Apache Spark (Streaming)][4] + +[5]: http://kafka.apache.org/ + +* [Apache Kafka][5] + +[6]: http://en.wikipedia.org/wiki/Data_parallelism + +* [Wikipedia: Data Parallelism][6] + +[7]: http://en.wikipedia.org/wiki/Task_parallelism + +* [Wikipedia: Task Parallelism][7] + +[8]: http://zookeeper.apache.org + +* [Apache ZooKeeper][8] + +[9]: https://github.com/yahoo/storm-yarn + +* [Yahoo! Storm-YARN][9] + +[10]: http://hortonworks.com/kb/storm-on-yarn-install-on-hdp2-beta-cluster/ + +* [Hortonworks: Storm on YARN][10] + +[11]: http://mesos.apache.org + +* [Apache Mesos][11] + +[12]: https://mesosphere.io/learn/run-storm-on-mesos/ + +* [Run Storm on Mesos][12] + +[13]: https://github.com/mesosphere/marathon + +* [Marathon][13] + +[14]: http://aws.amazon.com/s3/ + +[15]: http://en.wikipedia.org/wiki/Network_File_System + +[16]: http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html + +[17]: https://issues.apache.org/jira/browse/STORM/ + +[18]: https://issues.apache.org/jira/browse/SPARK/ + +[19]: http://www.scala-lang.org/ + +[20]: http://clojure.org/ + +[21]: http://en.wikipedia.org/wiki/Lisp_(programming_language) + +[22]: https://github.com/zeromq/jzmq + +[23]: http://zeromq.org/ + +[24]: http://netty.io/ + +[25]: http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty + +[26]: http://akka.io + +[27]: http://www.apache.org/ + +[28]: http://supervisord.org + +[29]: http://xinhstechblog.blogspot.com/2014/06/storm-vs-spark-streaming-side-by-side.html + +* [Storm vs Spark Streaming: Side by Side][29] + +[30]: http://www.slideshare.net/ptgoetz/apache-storm-vs-spark-streaming + +* [Storm vs Spark Streaming (Slideshare)][30] + +[31]: http://en.wikipedia.org/wiki/Batch_processing + +[32]: http://en.wikipedia.org/wiki/Event_stream_processing + +[33]: https://storm.incubator.apache.org/documentation/Trident-API-Overview.html + +[34]: http://storm.incubator.apache.org/documentation/Powered-By.html + +[35]: http://en.wikipedia.org/wiki/Process_supervision + +[36]: http://en.wikipedia.org/wiki/Extract,_transform,_load + +[37]: http://en.wikipedia.org/wiki/Window_function_(SQL)#Window_function + +[38]: https://gist.github.com/kennyballou/c6ff37e5eef6710794a6 diff --git a/content/blog/Storm.markdown b/content/blog/Storm.markdown new file mode 100644 index 0000000..8ff3707 --- /dev/null +++ b/content/blog/Storm.markdown @@ -0,0 +1,594 @@ +--- +title: "Real-Time Streaming with Apache Storm and Apache Kafka" +descritption: "Overview of Apache Storm and sample Twitter Sentiment Analysis" +tags: + - "Apache Storm" + - "Apache Kafka" + - "Apache" + - "Java" + - "Sentiment Analysis" + - "Real-time Streaming" + - "ZData Inc." +date: "2014-07-16" +categories: + - "Apache" + - "Development" + - "Real-time Systems" +slug: "real-time-streaming-storm-and-kafka" +--- + +The following post is one in the series of real-time systems tangential +to the Hadoop ecosystem. First, exploring both Apache Storm and Apache +Kafka as a part of a real-time processing engine. These two systems work +together very well and make for an easy development experience while +still being very performant. + +## About Kafka ## + +[Apache Kafka][3] is a message queue rethought as a distributed commit log. It +follows the publish-subscribe messaging style, with speed and durability built +in. + +Kafka uses Zookeeper to share and save state between brokers. Each broker +maintains a set of partitions: primary and/ or secondary for each topic. A set +of Kafka brokers working together will maintain a set of topics. Each topic has +its partitions distributed over the participating Kafka brokers and, as of +Kafka version 0.8, the replication factor determines, intuitively, the number +of times a partition is duplicated for fault tolerance. + +While many brokered message queue systems have the broker maintain the state of +its consumers, Kafka does not. This frees up resources for the broker to ingest +data faster. For more information about Kafka's performance see [Benchmarking +Kafka][4]. + +### Initial Thoughts ### + +Kafka is a very promising project, with astounding throughput and one of +the easiest pieces of software I have had the joy of installing and +configuring. Although Kafka is not at the production 1.0 stable release yet, +it's well on its way. + +## About Storm ## + +[Apache Storm][1], currently in incubation, is a real-time computational engine +made available under the free and open-source Apache version 2.0 license. It +runs continuously, consuming data from the configured sources (Spouts) and +passes the data down the processing pipeline (Bolts). Combined, Spouts and +Bolts make a Topology. A topology can be written in any language including any +JVM based language, Python, Ruby, Perl, or, with some work, even C [[2][2]]. + +### Why Storm ### + +Quoting from the project site: + +> Storm has many use cases: realtime analytics, online machine learning, +> continuous computation, distributed RPC, ETL, and more. Storm is fast: a +> benchmark clocked it at over a million tuples processed per second per node. +> It is scalable, fault-tolerant, guarantees your data will be processed, and +> is easy to set up and operate. [[1][1]] + +### Integration ### + +Storm can integrate with any queuing and any database system. In fact, there +are already quite a few existing projects for use to integrate Storm with other +projects, like Kestrel or Kafka[[5][5]]. + +### Initial Thoughts ### + +I found Storm's verbiage around the computational pipeline to fit my mental +model very well, thinking about streaming computational processes as directed +acyclic graphs makes a lot of intuitive sense. That said, although I haven't +been developing against Storm for very long, I do find some integration tasks +to be slightly awkward. For example, writing an HDFS file writer bolt +requires some special considerations given bolt life cycles and HDFS writing +patterns. This is really only a minor blemish however, as it only means the +developers of Storm topologies have to understand the API more intimately; +there are already common patterns emerging that should be adaptable to about +any situation [[16][16]]. + +## Test Project: Twitter Stream Sentiment Analysis ## + +To really give Storm a try, something a little more involved than just a simple +word counter is needed. Therefore, I have put together a Twitter Sentiment +Analysis topology. Though this is a good representative example of a more +complicated topology, the method used for actually scoring the Twitter data is +overly simple. + +### Setup ### + +The setup used for this demo is a 5 node Vagrant virtual cluster. Each node is +running 64 bit CentOS 6.5, given 1 core, and 1024MB of RAM. Every node is +running HDFS (datanode), Zookeeper, and Kafka. The first node, `node0`, is the +namenode, and Nimbus -- Storm's master daemon. `node0` is also running a +[Docker][7] container with a NodeJS application, part of the reporting process. +The remaining nodes, `node[1-4]`, are Storm worker nodes. Storm, Kafka, and +Zookeeper are all run under supervision via [Supervisord][6], so +High-Availability is baked into this virtual cluster. + +### Overview ### + +![Sentiment Analysis Topology][satimg] + +I wrote a simple Kafka producer that reads files off disk and sends them to the +Kafka cluster. This is how we feed the whole system and is used in lieu of +opening a stream to Twitter. + +#### Spout #### + +The orange node from the picture is our [`KafkaSpout`][8] that will be +consuming incoming messages from the Kafka brokers. + +#### Twitter Data JSON Parsing #### + +The first bolt, `2` in the image, attempts to parse the Twitter JSON data and +emits `tweet_id` and `tweet_text`. This implementation only processes English +tweets. If the topology were to be more ambitious, it may pass the language +code down and create different scoring bolts for each language. + +#### Filtering and Stemming #### + +This next bolt, `3`, performs first-round data sanitization. That is, it +removes all non-alpha characters. + +Following, the next round of data cleansing, `4`, is to remove common words +to reduce noise for the classifiers. These common words are usually referred to +as _stop words_. [_Stemming_][15], or considering suffixes to match the root, +could also be performed here, or in another, later bolt. + +`4`, when finished, will then broadcast the data to both of the classifiers. + +#### Classifiers #### + +Each classifier is defined by its own bolt. One classifier for the positive +class, `5`, and another for the negative class,`6`. + +The implementation of the classifiers follows the [Bag-of-words][12] model. + +#### Join and Scoring #### + +Next, bolt `7` joins the scores from the two previous classifiers. The +implementation of this bolt isn't perfect. For example, message transaction is +not guaranteed: if one-side of the join fails, neither side is notified. + +To finish up the scoring, bolt `8` compares the scores from the classifiers and +declares the class accordingly. + +#### Reporting: HDFS and HTTP POST #### + +Finally, the scoring bolt broadcasts off the results to a HDFS file writer +bolt, `9`, and a NodeJS notifier bolt, `10`. The HDFS bolt fills a list until +it has 1000 records in it and then spools to disk. Of course, like the join +bolt, this could be better implemented to fail input tuples if the bolt fails +to write to disk or have a timeout to write to disk after a certain +period of time. The NodeJs notifier bolt, on the other hand, sends a HTTP POST +each time a record is received. This could be batched as well, but again, this +is for demonstration purposes only. + +### Implementing the Kafka Producer ### + +Here, the main portions of the code for the Kafka producer that was used to +test our cluster are defined. + +In the main class, we setup the data pipes and threads: + + LOGGER.debug("Setting up streams"); + PipedInputStream send = new PipedInputStream(BUFFER_LEN); + PipedOutputStream input = new PipedOutputStream(send); + + LOGGER.debug("Setting up connections"); + LOGGER.debug("Setting up file reader"); + BufferedFileReader reader = new BufferedFileReader(filename, input); + LOGGER.debug("Setting up kafka producer"); + KafkaProducer kafkaProducer = new KafkaProducer(topic, send); + + LOGGER.debug("Spinning up threads"); + Thread source = new Thread(reader); + Thread kafka = new Thread(kafkaProducer); + + source.start(); + kafka.start(); + + LOGGER.debug("Joining"); + kafka.join(); + +The `BufferedFileReader` in its own thread reads off the data from disk: + + rd = new BufferedReader(new FileReader(this.fileToRead)); + wd = new BufferedWriter(new OutputStreamWriter(this.outputStream, ENC)); + int b = -1; + while ((b = rd.read()) != -1) + { + wd.write(b); + } + +Finally, the `KafkaProducer` sends asynchronous messages to the Kafka Cluster: + + rd = new BufferedReader(new InputStreamReader(this.inputStream, ENC)); + String line = null; + producer = new Producer<Integer, String>(conf); + while ((line = rd.readLine()) != null) + { + producer.send(new KeyedMessage<Integer, String>(this.topic, line)); + } + +Doing these operations on separate threads gives us the benefit of having disk +reads not block the Kafka producer or vice-versa, enabling maximum performance +tunable by the size of the buffer. + +### Implementing the Storm Topology ### + +#### Topology Definition #### + +Moving onward to Storm, here we define the topology and how each bolt will be +talking to each other: + + TopologyBuilder topology = new TopologyBuilder(); + + topology.setSpout("kafka_spout", new KafkaSpout(kafkaConf), 4); + + topology.setBolt("twitter_filter", new TwitterFilterBolt(), 4) + .shuffleGrouping("kafka_spout"); + + topology.setBolt("text_filter", new TextFilterBolt(), 4) + .shuffleGrouping("twitter_filter"); + + topology.setBolt("stemming", new StemmingBolt(), 4) + .shuffleGrouping("text_filter"); + + topology.setBolt("positive", new PositiveSentimentBolt(), 4) + .shuffleGrouping("stemming"); + topology.setBolt("negative", new NegativeSentimentBolt(), 4) + .shuffleGrouping("stemming"); + + topology.setBolt("join", new JoinSentimentsBolt(), 4) + .fieldsGrouping("positive", new Fields("tweet_id")) + .fieldsGrouping("negative", new Fields("tweet_id")); + + topology.setBolt("score", new SentimentScoringBolt(), 4) + .shuffleGrouping("join"); + + topology.setBolt("hdfs", new HDFSBolt(), 4) + .shuffleGrouping("score"); + topology.setBolt("nodejs", new NodeNotifierBolt(), 4) + .shuffleGrouping("score"); + +Notably, the data is shuffled to each bolt until except when joining, as it's +very important that the same tweets are given to the same instance of the +joining bolt. To read more about stream groupings, visit the [Storm +documentation][17]. + +#### Twitter Data Filter / Parser #### + +Parsing the Twitter JSON data is one of the first things that needs to be done. +This is accomplished with the use of the [JacksonXML Databind][11] library. + + JsonNode root = mapper.readValue(json, JsonNode.class); + long id; + String text; + if (root.get("lang") != null && + "en".equals(root.get("lang").textValue())) + { + if (root.get("id") != null && root.get("text") != null) + { + id = root.get("id").longValue(); + text = root.get("text").textValue(); + collector.emit(new Values(id, text)); + } + else + LOGGER.debug("tweet id and/ or text was null"); + } + else + LOGGER.debug("Ignoring non-english tweet"); + +As mentioned before, `tweet_id` and `tweet_text` are the only values being +emitted. + +This is just using the basic `ObjectMapper` class from the Jackson Databind +library to map the simple JSON object provided by the Twitter Streaming API to +a `JsonNode`. The language code is first tested to be English, as the topology +does not support non-English tweets. The new tuple is emitted down the Storm +pipeline after ensuring the existence of the desired data, namely, `tweet_id`, +and `tweet_text`. + +#### Text Filtering #### + +As previously mentioned, punctuation and other symbols are removed because they +are just noise to the classifiers: + + Long id = input.getLong(input.fieldIndex("tweet_id")); + String text = input.getString(input.fieldIndex("tweet_text")); + text = text.replaceAll("[^a-zA-Z\\s]", "").trim().toLowerCase(); + collector.emit(new Values(id, text)); + +_Very_ common words are also removed because they are similarly noisy to the +classifiers: + + Long id = input.getLong(input.fieldIndex("tweet_id")); + String text = input.getString(input.fieldIndex("tweet_text")); + List<String> stopWords = StopWords.getWords(); + for (String word : stopWords) + { + text = text.replaceAll(word, ""); + } + collector.emit(new Values(id, text)); + +Here the `StopWords` class is a singleton holding the list of words we +wish to omit. + +#### Positive and Negative Scoring #### + +Since the approach for scoring is based on a very limited [Bag-of-words][12] +model, Each class is put into its own bolt; this also contrives the need for a +join later. + +Positive Scoring: + + Long id = input.getLong(input.fieldIndex("tweet_id")); + String text = input.getString(input.fieldIndex("tweet_text")); + Set<String> posWords = PositiveWords.getWords(); + String[] words = text.split(" "); + int numWords = words.length; + int numPosWords = 0; + for (String word : words) + { + if (posWords.contains(word)) + numPosWords++; + } + collector.emit(new Values(id, (float) numPosWords / numWords, text)); + +Negative Scoring: + + Long id = input.getLong(input.fieldIndex("tweet_id")); + String text = input.getString(input.fieldIndex("tweet_text")); + Set<String> negWords = NegativeWords.getWords(); + String[] words = text.split(" "); + int numWords = words.length; + int numNegWords = 0; + for (String word : words) + { + if (negWords.contains(word)) + numNegWords++; + } + collector.emit(new Values(id, (float)numNegWords / numWords, text)); + +Similar to `StopWords`, `PositiveWords` and `NegativeWords` are both singletons +maintaining lists of positive and negative words, respectively. + +#### Joining Scores #### + +Joining in Storm isn't the most straight forward process to implement, although +the process may be made simpler with the use of the [Trident API][13]. However, +to get a feel for what to do without Trident and as an Academic exercise, the +join is not implemented with the Trident API. On related note, this join +isn't perfect! It ignores a couple of issues, namely, it does not correctly +fail a tuple on a one-sided join (when tweets are received twice from the same +scoring bolt) and doesn't timeout tweets if they are left in the queue for too +long. With this in mind, here is our join: + + Long id = input.getLong(input.fieldIndex("tweet_id")); + String text = input.getString(input.fieldIndex("tweet_text")); + if (input.contains("pos_score")) + { + Float pos = input.getFloat(input.fieldIndex("pos_score")); + if (this.tweets.containsKey(id)) + { + Triple<String, Float, String> triple = this.tweets.get(id); + if ("neg".equals(triple.getCar())) + emit(collector, id, triple.getCaar(), pos, triple.getCdr()); + else + { + LOGGER.warn("one sided join attempted"); + this.tweets.remove(id); + } + } + else + this.tweets.put( + id, + new Triple<String, Float, String>("pos", pos, text)); + } + else if (input.contains("neg_score")) + { + Float neg = input.getFloat(input.fieldIndex("neg_score")); + if (this.tweets.containsKey(id)) + { + Triple<String, Float, String> triple = this.tweets.get(id); + if ("pos".equals(triple.getCar())) + emit(collector, id, triple.getCaar(), neg, triple.getCdr()); + else + { + LOGGER.warn("one sided join attempted"); + this.tweets.remove(id); + } + } + else + this.tweets.put( + id, + new Triple<String, Float, String>("neg", neg, text)); + } + +Where `emit` is defined simply by: + + private void emit( + BasicOutputCollector collector, + Long id, + String text, + float pos, + float neg) + { + collector.emit(new Values(id, pos, neg, text)); + this.tweets.remove(id); + } + +#### Deciding the Winning Class #### + +To ensure the [Single responsibility principle][14] is not violated, joining +and scoring are split into separate bolts, though the class of the tweet could +certainly be decided at the time of joining. + + Long id = tuple.getLong(tuple.fieldIndex("tweet_id")); + String text = tuple.getString(tuple.fieldIndex("tweet_text")); + Float pos = tuple.getFloat(tuple.fieldIndex("pos_score")); + Float neg = tuple.getFloat(tuple.fieldIndex("neg_score")); + String score = pos > neg ? "positive" : "negative"; + collector.emit(new Values(id, text, pos, neg, score)); + +This decider will prefer negative scores, so if there is a tie, it's +automatically handed to the negative class. + +#### Report the Results #### + +Finally, there are two bolts that will yield results: one that writes +data to HDFS, and another to send the data to a web server. + + Long id = input.getLong(input.fieldIndex("tweet_id")); + String tweet = input.getString(input.fieldIndex("tweet_text")); + Float pos = input.getFloat(input.fieldIndex("pos_score")); + Float neg = input.getFloat(input.fieldIndex("neg_score")); + String score = input.getString(input.fieldIndex("score")); + String tweet_score = + String.format("%s,%s,%f,%f,%s\n", id, tweet, pos, neg, score); + this.tweet_scores.add(tweet_score); + if (this.tweet_scores.size() >= 1000) + { + writeToHDFS(); + this.tweet_scores = new ArrayList<String>(1000); + } + +Where `writeToHDFS` is primarily given by: + + Configuration conf = new Configuration(); + conf.addResource(new Path("/opt/hadoop/etc/hadoop/core-site.xml")); + conf.addResource(new Path("/opt/hadoop/etc/hadoop/hdfs-site.xml")); + hdfs = FileSystem.get(conf); + file = new Path( + Properties.getString("rts.storm.hdfs_output_file") + this.id); + if (hdfs.exists(file)) + os = hdfs.append(file); + else + os = hdfs.create(file); + wd = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); + for (String tweet_score : tweet_scores) + { + wd.write(tweet_score); + } + +And our `HTTP POST` to a web server: + + Long id = input.getLong(input.fieldIndex("tweet_id")); + String tweet = input.getString(input.fieldIndex("tweet_text")); + Float pos = input.getFloat(input.fieldIndex("pos_score")); + Float neg = input.getFloat(input.fieldIndex("neg_score")); + String score = input.getString(input.fieldIndex("score")); + HttpPost post = new HttpPost(this.webserver); + String content = String.format( + "{\"id\": \"%d\", " + + "\"text\": \"%s\", " + + "\"pos\": \"%f\", " + + "\"neg\": \"%f\", " + + "\"score\": \"%s\" }", + id, tweet, pos, neg, score); + + try + { + post.setEntity(new StringEntity(content)); + HttpResponse response = client.execute(post); + org.apache.http.util.EntityUtils.consume(response.getEntity()); + } + catch (Exception ex) + { + LOGGER.error("exception thrown while attempting post", ex); + LOGGER.trace(null, ex); + reconnect(); + } + +There are some faults to point out with both of these bolts. Namely, the HDFS +bolt tries to batch the writes into 1000 tweets, but does not keep track of the +tuples nor does it time out tuples or flush them at some interval. That is, if +a write fails or if the queue sits idle for too long, the topology is not +notified and can't resend the tuples. Similarly, the `HTTP POST`, does not +batch and sends each POST synchronously causing the bolt to block for each +message. This can be alleviated with more instances of this bolt and more web +servers to handle the increase in posts, and/ or a better batching process. + +## Summary ## + +The full source of this test application can be found on [Github][9]. + +Apache Storm and Apache Kafka both have great potential in the real-time +streaming market and have so far proven themselves to be very capable systems +for performing real-time analytics. + +Stay tuned, as the next post in this series will be an overview look at +Streaming with Apache Spark. + +## Related Links / References ## + +[satimg]: https://localhost/media/SentimentAnalysisTopology.png + +[1]: http://storm.incubator.apache.org/ + +* [Apache Storm Project Page][1] + +[2]: http://storm.incubator.apache.org/about/multi-language.html + +* [Storm Multi-Language Documentation][2] + +[3]: http://kafka.apache.org/ + +* [Apache Kafka Project Page][3] + +[4]: http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines + +* [LinkedIn Kafka Benchmarking: 2 million writes per second][4] + +[5]: http://storm.incubator.apache.org/about/integrates.html + +* [Storm Integration Documentation][5] + +[6]: http://supervisord.org/ + +* [Supervisord Project Page][6] + +[7]: http://www.docker.io/ + +* [Docker IO Project Page][7] + +[8]: https://github.com/apache/incubator-storm/tree/master/external/storm-kafka + +* [Storm-Kafka Source][8] + +[9]: https://github.com/zdata-inc/StormSampleProject + +* [Full Source of Test Project][9] + +[10]: https://wiki.apache.org/incubator/StormProposal + +* [Apache Storm Incubation Proposal][10] + +[11]: https://github.com/FasterXML/jackson-databind + +* [Jackson Databind Project Bag][11] + +[12]: http://en.wikipedia.org/wiki/Bag-of-words_model + +* [Wikipedia: Bag of words][12] + +[13]: http://storm.incubator.apache.org/documentation/Trident-API-Overview.html + +* [Storm Trident API Overview][13] + +[14]: http://en.wikipedia.org/wiki/Single_responsibility_principle + +* [Wikipedia: Single responsibility principle][14] + +[15]: http://en.wikipedia.org/wiki/Stemming + +* [Wikipedia: Stemming][15] + +[16]: http://storm.incubator.apache.org/documentation/Common-patterns.html + +* [Storm Documentation: Common Patterns][16] + +[17]: http://storm.incubator.apache.org/documentation/Concepts.html#stream-groupings + +* [Stream Groupings][17] diff --git a/layouts/404.html b/layouts/404.html new file mode 100644 index 0000000..0c15e5a --- /dev/null +++ b/layouts/404.html @@ -0,0 +1,8 @@ +{{ partial "header.html" . }} +<body> +<h1>You Seem Lost...</h1> +<p>The page you were looking for seems to have left the server or never +existed. If you think this may be an error, sorry. The web server thinks you +have an error.</p> +</body> +{{ partial "footer.html" . }} diff --git a/layouts/_default/list.html b/layouts/_default/list.html new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/layouts/_default/list.html diff --git a/layouts/_default/single.html b/layouts/_default/single.html new file mode 100644 index 0000000..00aaabc --- /dev/null +++ b/layouts/_default/single.html @@ -0,0 +1,17 @@ +{{ partial "header.html" . }} +<body> +{{ partial "subheader.html" . }} +<section id="main"> + <div> + <div class="colleft"> + <article id="content"> + {{ .Content}} + </article> + </div> + <div class="colright"> + {{ partial "author.html" . }} + </div> + </div> +</section> +</body> +{{ partial "footer.html" . }} diff --git a/layouts/blog/li.html b/layouts/blog/li.html new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/layouts/blog/li.html diff --git a/layouts/blog/single.html b/layouts/blog/single.html new file mode 100644 index 0000000..1086aa8 --- /dev/null +++ b/layouts/blog/single.html @@ -0,0 +1,27 @@ +{{ partial "header.html" . }} +<body> +{{ partial "subheader.html" . }} +<section id="main"> + <div> + <div class="colleft"> + <article id="content"> + <h1 id="title"> {{ .Title }}</h1> + <div class="post-meta"> + <ul class="tags"> + <li><i class="fa fa-tags"></i></li> + {{ range .Params.tags }} + <li>{{ . }}</li> + {{ end }} + </ul> + <h4 id="date"> {{ .Date.Format "Mon Jan 2, 2006" }}</h4> + </div> + {{ .Content}} + </article> + </div> + <div class="colright"> + {{ partial "author.html" . }} + </div> + </div> +</section> +</body> +{{ partial "footer.html" . }} diff --git a/layouts/blog/summary.html b/layouts/blog/summary.html new file mode 100644 index 0000000..6fe1d59 --- /dev/null +++ b/layouts/blog/summary.html @@ -0,0 +1,20 @@ +<article class="post"> + <header> + <h2><a href="{{ .Permalink }}">{{ .Title }}</a></h2> + <div class="post-meta">{{ .Date.Format "Mon Jan 2, 2006" }}</div> + </header> + + <blockquote> + {{ .Summary }} + </blockquote> + <ul class="tags"> + <li><i class="fa fa-tags"></i></li> + {{ range .Params.tags }} + <li>{{ . }}</li> + {{ end }} + </ul> + + <footer> + <a href="{{ .Permalink }}"><nobr>Read More</nobr></a> + </footer> +</article> diff --git a/layouts/index.html b/layouts/index.html new file mode 100644 index 0000000..528a97c --- /dev/null +++ b/layouts/index.html @@ -0,0 +1,20 @@ +{{ partial "header.html" . }} +<body> +{{ partial "subheader.html" . }} +<section id="main"> + <div> + <div class="colleft"> + {{ range first 10 .Data.Pages }} + {{ if eq .Section "blog" }} + {{ .Render "summary" }} + {{ end }} + {{ end }} + </div> + <div class="colright"> + {{ partial "author.html" . }} + </div> + <div> +</section> + + +{{ partial "footer.html" . }} diff --git a/layouts/partials/author.html b/layouts/partials/author.html new file mode 100644 index 0000000..a1c7939 --- /dev/null +++ b/layouts/partials/author.html @@ -0,0 +1,21 @@ +<section> + <div id="author"> + <a class="fade" href="about-me/"><h3 class="fade">Kenny Ballou</h3></a> + <h4><code>:(){ :|:& };:</code></h4> + <a href="https://github.com/kennyballou"> + <i class="fade fa fa-github-square fa-2x"></i> + </a> + <a href="https://bitbucket.org/kballou"> + <i class="fade fa fa-bitbucket-square fa-2x"></i> + </a> + <a href="http://www.linkedin.com/pub/kenny-ballou/43/1a9/90"> + <i class="fade fa fa-linkedin-square fa-2x"></i> + </a> + <a href="https://plus.google.com/+KennyBallou"> + <i class="fade fa fa-google-plus-square fa-2x"></i> + </a> + <a href="https://twitter.com/kennyballou"> + <i class="fade fa fa-twitter-square fa-2x"></i> + </a> + </div> +</section> diff --git a/layouts/partials/footer.html b/layouts/partials/footer.html new file mode 100644 index 0000000..5fb79e9 --- /dev/null +++ b/layouts/partials/footer.html @@ -0,0 +1,13 @@ +<footer> + <div id="footer"> + <div class="colleft"> + <p>© 2014 Kenny Ballou. <a + href="http://creativecommons.org/licenses/by/3.0/">Some rights + reserved.</a></p> + </div> + <div class="colright"> + <p>Powered by <a href="http://gohugo.io/">Hugo.</a></p> + </div> + </div> +</footer> +</html> diff --git a/layouts/partials/head_includes.html b/layouts/partials/head_includes.html new file mode 100644 index 0000000..2df69ce --- /dev/null +++ b/layouts/partials/head_includes.html @@ -0,0 +1,4 @@ +<link rel="sytlesheet" href="/css/site.css" /> +<link rel="stylesheet" + href="//maxcdn.bootstrapcdn.com/font-awesome/4.2.0/css/font-awesome.min.css"> +<script src="{{ .Site.BaseUrl }}/js/site.js"></script> diff --git a/layouts/partials/header.html b/layouts/partials/header.html new file mode 100644 index 0000000..2dc4d46 --- /dev/null +++ b/layouts/partials/header.html @@ -0,0 +1,20 @@ +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="utf-8"> + + {{ partial "meta.html" . }} + + <base href="{{ .Site.BaseUrl }}" /> + <title>{{ .Site.Title }}</title> + + <link rel="canonical" href="{{ .Permalink }}" /> + {{ if .RSSlink }} + <link href="{{ .RSSlink }}" rel="alternate" type="application/rss+xml" + title="{{ .Title }}" /> + {{ end }} + + {{ partial "head_includes.html" . }} + <link rel="stylesheet" href="{{ .Site.BaseUrl }}/css/site.css" /> + <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon" /> +</head> diff --git a/layouts/partials/meta.html b/layouts/partials/meta.html new file mode 100644 index 0000000..ec09d41 --- /dev/null +++ b/layouts/partials/meta.html @@ -0,0 +1,2 @@ +<meta name="author" content="Kenny Ballou"> +<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"> diff --git a/layouts/partials/subheader.html b/layouts/partials/subheader.html new file mode 100644 index 0000000..3a4aaae --- /dev/null +++ b/layouts/partials/subheader.html @@ -0,0 +1,5 @@ +<div id="header"> + <header> + <a class="fade" href="{{ .Site.BaseUrl }}">~kballou</a> + </header> +</div> diff --git a/layouts/rss.xml b/layouts/rss.xml new file mode 100644 index 0000000..5950804 --- /dev/null +++ b/layouts/rss.xml @@ -0,0 +1,21 @@ +<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom"> + <channel> + <title>{{ .Title }} on {{ .Site.Title }} </title> + <generator uri="https://gohugo.io">Hugo</generator> + <link>{{ .Permalink }}</link> + {{ with .Site.LanguageCode }}<language>{{ . }}</language>{{ end }} + {{ with .Site.Author.name }}<author>{{ . }}</author>{{ end }} + {{ with .Site.Copyright }}<copyright>{{ . }}</copyright>{{ end }} + <updated>{{ .Date.Format "Mon, 02 Jan 2006 15:04:05 MST" }}</updated> + {{ range first 15 .Data.Pages }} + <item> + <title>{{ .Title }}</title> + <link>{{ .Permalink }}</link> + <pubDate>{{ .Date.Format "Mon, 02 Jan 2006 15:04:05 MST" }} </pubDate> + {{ with .Site.Author.name }}<author>{{ . }}</author>{{ end }} + <guid>{{ .Permalink }}</guid> + <description>{{ .Content | html }}</description> + </item> + {{ end }} + </channel> +</rss> diff --git a/layouts/single.html b/layouts/single.html new file mode 100644 index 0000000..5c0336c --- /dev/null +++ b/layouts/single.html @@ -0,0 +1,5 @@ +{{ partial "header.html" . }} +<body> + {{ .Content}} +</body> +{{ partial "header.html" . }} diff --git a/layouts/sitemap.xml b/layouts/sitemap.xml new file mode 100644 index 0000000..e99a1bc --- /dev/null +++ b/layouts/sitemap.xml @@ -0,0 +1,16 @@ +<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"> + {{ range .Data.Pages }} + <url> + <loc>{{ .Permalink }}</loc> + <lastmod> + {{ safeHtml ( .Date.Format "2006-01-02T15:04:05-7:00" ) }} + </lastmod> + {{ with .Sitemap.ChangeFreq }} + <changefreq>{{ . }}</changefreq> + {{ end }} + {{ if ge .Sitemap.Priority 0.0 }} + <priority>{{ .Sitemap.Priority }}</priority> + {{ end }} + </url> + {{ end }} +</urlset> diff --git a/static/css/site.css b/static/css/site.css new file mode 100644 index 0000000..bbd6065 --- /dev/null +++ b/static/css/site.css @@ -0,0 +1,131 @@ +body { + box-sizing: border-box; + font: 13px Helvetica, Arial; + background-color: #282828; + color: #F8F8F2; +} +h1 { + color: #F92672; +} +h2, h4 { + color: #66D9EF; +} +h3, h5 { + color: #A6E22E; +} +a { + color: #AE81FF; + text-decoration: none; +} +a:hover { + text-decoration: underline; +} +blockquote { + padding: 1em 1em; + border-left: 5px solid #333333; + margin: 0 0 1em; +} +.post-meta { + font-style: italic; +} +#author { + padding: 5em; +} +#content { + margin: 2em 2em 5em 3em; + width: 100%; +} +#header { + background-color: #282828; + display: block; + height: auto; + overflow: visible; + padding-right: 2em; + padding-left: 0.7em; + border-bottom: 1px solid #292929; +} +#footer { + background-color: #282828; + height: 2.5em; + padding: 1em 0; + overflow: hidden; + border-top: 1px solid #292929; + position: fixed; + bottom: 0; + right: 0; + left: 0; + z-index: 1030; + margin-bottom: 0; + margin-left: 2em; + border-width: 1px 0 0: +} +#header header a { + color: #F92672; + font-size: 210%; +} +#header header a:hover { + text-decoration: none; +} +div#author a:hover { + text-decoration: none; +} +.fade { + opacity: 0.5; + transition: opacity .25s ease-in-out; + -moz-transition: opacity: .25s ease-in-out; + -webkit-transition: opacity .25s ease-in-out; +} +.fade:hover { + opacity: 1; +} +pre { + padding: 9.5px; + margin: 0 0 10px; + word-break: break-all; + word-wrap: break-word; + color: #2b2b2b; + background-color: #F5F5F5; + border: 1px solid #CCCCCC; + border-radius: 4px; +} +code { + padding: 0.25em; + color: #F8F8F2; + font-family: DejaVu Sans Mono, Consolas; + white-space: pre-wrap; + border: 0; + border-radius: 4px; +} +pre code { + display: block; + background-color: #303030; +} +.post { + width: 100%; + margin-left: 3em; + margin-right: 3em; + margin-bottom: 3em; +} +.tags { + display: inline-block; + list-style: none; + padding-left: 0; + margin: 0 0 0 0.2em; +} +.tags li { + display: inline-block; + padding-left: 0.3em; +} +.tags li:nth-child(even) { + color: #66D9EF; +} +.colleft { + float: left; + width: 70%; + position: relative; +} +.colright { + float: right; + width: 20%; + position: relative; +} diff --git a/static/favicon.ico b/static/favicon.ico Binary files differnew file mode 100644 index 0000000..bd80b91 --- /dev/null +++ b/static/favicon.ico diff --git a/static/media/SentimentAnalysisTopology.png b/static/media/SentimentAnalysisTopology.png Binary files differnew file mode 100644 index 0000000..44d8ede --- /dev/null +++ b/static/media/SentimentAnalysisTopology.png diff --git a/static/media/spark_issues_chart.png b/static/media/spark_issues_chart.png Binary files differnew file mode 100644 index 0000000..1932741 --- /dev/null +++ b/static/media/spark_issues_chart.png diff --git a/static/media/storm_issues_chart.png b/static/media/storm_issues_chart.png Binary files differnew file mode 100644 index 0000000..78bc99a --- /dev/null +++ b/static/media/storm_issues_chart.png |