gearpump-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manuzhang <...@git.apache.org>
Subject [GitHub] incubator-gearpump pull request: fix GEARPUMP-109: add wordcount J...
Date Thu, 28 Apr 2016 01:20:28 GMT
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/6#discussion_r61362610
  
    --- Diff: docs/dev-write-1st-app.md ---
    @@ -14,23 +14,131 @@ Repository and library dependencies can be found at [Maven Setting](maven-settin
     ### IDE Setup (Optional)
     You can get your preferred IDE ready for Gearpump by following [this guide](dev-ide-setup.html).
     
    -### Define Processor(Task) class and Partitioner class
    +### Decide which language and API to use for writing 
    +Gearpump supports two level APIs:
     
    -An application is a Directed Acyclic Graph (DAG) of processors. In the wordcount example,
We will firstly define two processors `Split` and `Sum`, and then weave them together.
    +1. Low level API, which is more similar to Akka programming, operating on each event.
The API document can be found at [Low Level API Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#io.gearpump.streaming.package).
    +
    +2. High level API (aka DSL), which is operating on streaming instead of individual event.
The API document can be found at [DSL API Doc](http://www.gearpump.io/releases/latest/api/scala/index.html#io.gearpump.streaming.dsl.package).
    +
    +And both APIs have their Java version and Scala version.
    +
    +So, before you writing our first Gearpump application, you need to decide which API to
use and which language to use. 
    +
    +## DSL version for Wordcount
    +
    +The easiest way to write your streaming application is to write it with Gearpump DSL.

    +Below will demostrate how to write WordCount application via Gearpump DSL.
    +
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1" >
    +
    +
    +```scala
    +/** WordCount with High level DSL */
    +object WordCount extends AkkaApp with ArgumentsParser {
    +
    +  override val options: Array[(String, CLIOption[Any])] = Array.empty
     
    -#### About message type
    +  override def main(akkaConf: Config, args: Array[String]): Unit = {
    +    val context = ClientContext(akkaConf)
    +    val app = StreamApp("dsl", context)
    +    val data = "This is a good start, bingo!! bingo!!"
     
    -User are allowed to send message of type Any(Accept any type except Null, Nothing and
Unit).
    +    //count for each word and output to log
    +    app.source(data.lines.toList, 1, "source").
    +      // word => (word, count)
    +      flatMap(line => line.split("[\\s]+")).map((_, 1)).
    +      // (word, count1), (word, count2) => (word, count1 + count2)
    +      groupByKey().sum.log
     
    +    val appId = context.submit(app)
    +    context.close()
    +  }
    +}
     ```
    -case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp)
    +
    +</div>
    +
    +<div data-lang="java" markdown="1">
    +
    +```java
    +
    +/** Java version of WordCount with high level DSL API */
    +public class WordCount {
    +
    +  public static void main(String[] args) throws InterruptedException {
    +    main(ClusterConfig.defaultConfig(), args);
    +  }
    +
    +  public static void main(Config akkaConf, String[] args) throws InterruptedException
{
    +    ClientContext context = new ClientContext(akkaConf);
    +    JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
    +    List<String> source = Lists.newArrayList("This is a good start, bingo!! bingo!!");
    +
    +    //create a stream from the string list.
    +    JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source");
    +
    +    //tokenize the strings and create a new stream
    +    JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String,
String>() {
    +      @Override
    +      public Iterator<String> apply(String s) {
    +        return Lists.newArrayList(s.split("\\s+")).iterator();
    +      }
    +    }, "flatMap");
    +
    +    //map each string as (string, 1) pair
    +    JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String,
Tuple2<String, Integer>>() {
    +      @Override
    +      public Tuple2<String, Integer> apply(String s) {
    +        return new Tuple2<String, Integer>(s, 1);
    +      }
    +    }, "map");
    +
    +    //group by according to string
    +    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String,
Integer>, String>() {
    +      @Override
    +      public String apply(Tuple2<String, Integer> tuple) {
    +        return tuple._1();
    +      }
    +    }, 1, "groupBy");
    +
    +    //for each group, make the sum
    +    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new
ReduceFunction<Tuple2<String, Integer>>() {
    +      @Override
    +      public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String,
Integer> t2) {
    +        return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
    +      }
    +    }, "reduce");
    +
    +    //output result using log
    +    wordcount.log();
    +
    +    app.run();
    +    context.close();
    +  }
    +}
     ```
     
    +</div>
    +
    +</div>
    +
    +## Low level API based Wordcount
    +
    +### Define Processor(Task) class and Partitioner class
    +
    +An application is a Directed Acyclic Graph (DAG) of processors. In the wordcount example,
We will firstly define two processors `Split` and `Sum`, and then weave them together.
    +
    +
     #### Split processor
     
    -In the Split processor, we simply split a predefined text (the content is simplified
for conciseness) and send out each split word to Sum.
    +In the `Split` processor, we simply split a predefined text (the content is simplified
for conciseness) and send out each split word to Sum.
    --- End diff --
    
    "Sum" => "`Sum`"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message