beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [13/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout
Date Tue, 15 Mar 2016 16:07:08 GMT
[flink] restructure and cleanup Maven layout

- move examples to separate Maven project
- remove unused dependencies
- simplify Maven configuration
- introduce Scala suffixes to modules
- unify indention


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/071e4dd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/071e4dd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/071e4dd6

Branch: refs/heads/master
Commit: 071e4dd67021346b0cab2aafa0900ec7e34c4ef8
Parents: c0b9fc6
Author: Maximilian Michels <mxm@apache.org>
Authored: Tue Mar 15 16:03:14 2016 +0100
Committer: Maximilian Michels <mxm@apache.org>
Committed: Tue Mar 15 17:05:15 2016 +0100

----------------------------------------------------------------------
 runners/flink/README.md                         | 101 +--
 runners/flink/examples/pom.xml                  |  89 +++
 .../beam/runners/flink/examples/TFIDF.java      | 452 ++++++++++++
 .../beam/runners/flink/examples/WordCount.java  | 113 +++
 .../flink/examples/streaming/AutoComplete.java  | 387 ++++++++++
 .../flink/examples/streaming/JoinExamples.java  | 158 ++++
 .../KafkaWindowedWordCountExample.java          | 143 ++++
 .../examples/streaming/WindowedWordCount.java   | 130 ++++
 runners/flink/pom.xml                           | 416 +++++------
 runners/flink/runner/pom.xml                    | 145 ++++
 .../FlinkPipelineExecutionEnvironment.java      | 269 +++++++
 .../runners/flink/FlinkPipelineOptions.java     |  93 +++
 .../beam/runners/flink/FlinkPipelineRunner.java | 206 ++++++
 .../beam/runners/flink/FlinkRunnerResult.java   |  68 ++
 .../apache/beam/runners/flink/io/ConsoleIO.java |  82 +++
 .../FlinkBatchPipelineTranslator.java           | 153 ++++
 .../FlinkBatchTransformTranslators.java         | 594 +++++++++++++++
 .../FlinkBatchTranslationContext.java           | 129 ++++
 .../translation/FlinkPipelineTranslator.java    |  36 +
 .../FlinkStreamingPipelineTranslator.java       | 150 ++++
 .../FlinkStreamingTransformTranslators.java     | 406 +++++++++++
 .../FlinkStreamingTranslationContext.java       |  89 +++
 .../FlinkCoGroupKeyedListAggregator.java        |  60 ++
 .../functions/FlinkCreateFunction.java          |  62 ++
 .../functions/FlinkDoFnFunction.java            | 204 ++++++
 .../FlinkKeyedListAggregationFunction.java      |  77 ++
 .../functions/FlinkMultiOutputDoFnFunction.java | 177 +++++
 .../FlinkMultiOutputPruningFunction.java        |  43 ++
 .../functions/FlinkPartialReduceFunction.java   |  60 ++
 .../functions/FlinkReduceFunction.java          |  57 ++
 .../flink/translation/functions/UnionCoder.java | 150 ++++
 .../translation/types/CoderComparator.java      | 216 ++++++
 .../translation/types/CoderTypeInformation.java | 116 +++
 .../translation/types/CoderTypeSerializer.java  | 152 ++++
 .../types/InspectableByteArrayOutputStream.java |  34 +
 .../translation/types/KvCoderComperator.java    | 264 +++++++
 .../types/KvCoderTypeInformation.java           | 186 +++++
 .../types/VoidCoderTypeSerializer.java          | 112 +++
 .../wrappers/CombineFnAggregatorWrapper.java    |  92 +++
 .../wrappers/DataInputViewWrapper.java          |  59 ++
 .../wrappers/DataOutputViewWrapper.java         |  52 ++
 .../SerializableFnAggregatorWrapper.java        |  91 +++
 .../translation/wrappers/SinkOutputFormat.java  | 121 ++++
 .../translation/wrappers/SourceInputFormat.java | 164 +++++
 .../translation/wrappers/SourceInputSplit.java  |  52 ++
 .../streaming/FlinkAbstractParDoWrapper.java    | 266 +++++++
 .../FlinkGroupAlsoByWindowWrapper.java          | 631 ++++++++++++++++
 .../streaming/FlinkGroupByKeyWrapper.java       |  66 ++
 .../streaming/FlinkParDoBoundMultiWrapper.java  |  77 ++
 .../streaming/FlinkParDoBoundWrapper.java       | 100 +++
 .../io/FlinkStreamingCreateFunction.java        |  65 ++
 .../streaming/io/UnboundedFlinkSource.java      |  82 +++
 .../streaming/io/UnboundedSocketSource.java     | 233 ++++++
 .../streaming/io/UnboundedSourceWrapper.java    | 134 ++++
 .../state/AbstractFlinkTimerInternals.java      | 128 ++++
 .../streaming/state/FlinkStateInternals.java    | 715 +++++++++++++++++++
 .../streaming/state/StateCheckpointReader.java  |  91 +++
 .../streaming/state/StateCheckpointUtils.java   | 155 ++++
 .../streaming/state/StateCheckpointWriter.java  | 129 ++++
 .../wrappers/streaming/state/StateType.java     |  73 ++
 .../runner/src/main/resources/log4j.properties  |  23 +
 .../apache/beam/runners/flink/AvroITCase.java   | 127 ++++
 .../beam/runners/flink/FlattenizeITCase.java    |  74 ++
 .../beam/runners/flink/FlinkTestPipeline.java   |  72 ++
 .../beam/runners/flink/JoinExamplesITCase.java  | 101 +++
 .../runners/flink/MaybeEmptyTestITCase.java     |  65 ++
 .../runners/flink/ParDoMultiOutputITCase.java   | 100 +++
 .../beam/runners/flink/ReadSourceITCase.java    | 165 +++++
 .../flink/RemoveDuplicatesEmptyITCase.java      |  70 ++
 .../runners/flink/RemoveDuplicatesITCase.java   |  71 ++
 .../beam/runners/flink/SideInputITCase.java     |  69 ++
 .../apache/beam/runners/flink/TfIdfITCase.java  |  78 ++
 .../beam/runners/flink/WordCountITCase.java     |  75 ++
 .../runners/flink/WordCountJoin2ITCase.java     | 138 ++++
 .../runners/flink/WordCountJoin3ITCase.java     | 156 ++++
 .../beam/runners/flink/WriteSinkITCase.java     | 158 ++++
 .../flink/streaming/GroupAlsoByWindowTest.java  | 508 +++++++++++++
 .../flink/streaming/GroupByNullKeyTest.java     | 123 ++++
 .../flink/streaming/StateSerializationTest.java | 305 ++++++++
 .../streaming/TopWikipediaSessionsITCase.java   | 134 ++++
 .../beam/runners/flink/util/JoinExamples.java   | 160 +++++
 .../src/test/resources/log4j-test.properties    |  27 +
 .../FlinkPipelineExecutionEnvironment.java      | 269 -------
 .../runners/flink/FlinkPipelineOptions.java     |  93 ---
 .../beam/runners/flink/FlinkPipelineRunner.java | 206 ------
 .../beam/runners/flink/FlinkRunnerResult.java   |  68 --
 .../beam/runners/flink/examples/TFIDF.java      | 452 ------------
 .../beam/runners/flink/examples/WordCount.java  | 113 ---
 .../flink/examples/streaming/AutoComplete.java  | 387 ----------
 .../flink/examples/streaming/JoinExamples.java  | 158 ----
 .../KafkaWindowedWordCountExample.java          | 143 ----
 .../examples/streaming/WindowedWordCount.java   | 130 ----
 .../apache/beam/runners/flink/io/ConsoleIO.java |  82 ---
 .../FlinkBatchPipelineTranslator.java           | 153 ----
 .../FlinkBatchTransformTranslators.java         | 594 ---------------
 .../FlinkBatchTranslationContext.java           | 129 ----
 .../translation/FlinkPipelineTranslator.java    |  36 -
 .../FlinkStreamingPipelineTranslator.java       | 150 ----
 .../FlinkStreamingTransformTranslators.java     | 406 -----------
 .../FlinkStreamingTranslationContext.java       |  89 ---
 .../FlinkCoGroupKeyedListAggregator.java        |  60 --
 .../functions/FlinkCreateFunction.java          |  62 --
 .../functions/FlinkDoFnFunction.java            | 204 ------
 .../FlinkKeyedListAggregationFunction.java      |  77 --
 .../functions/FlinkMultiOutputDoFnFunction.java | 177 -----
 .../FlinkMultiOutputPruningFunction.java        |  43 --
 .../functions/FlinkPartialReduceFunction.java   |  60 --
 .../functions/FlinkReduceFunction.java          |  57 --
 .../flink/translation/functions/UnionCoder.java | 150 ----
 .../translation/types/CoderComparator.java      | 216 ------
 .../translation/types/CoderTypeInformation.java | 116 ---
 .../translation/types/CoderTypeSerializer.java  | 152 ----
 .../types/InspectableByteArrayOutputStream.java |  34 -
 .../translation/types/KvCoderComperator.java    | 264 -------
 .../types/KvCoderTypeInformation.java           | 186 -----
 .../types/VoidCoderTypeSerializer.java          | 112 ---
 .../wrappers/CombineFnAggregatorWrapper.java    |  92 ---
 .../wrappers/DataInputViewWrapper.java          |  59 --
 .../wrappers/DataOutputViewWrapper.java         |  52 --
 .../SerializableFnAggregatorWrapper.java        |  91 ---
 .../translation/wrappers/SinkOutputFormat.java  | 121 ----
 .../translation/wrappers/SourceInputFormat.java | 164 -----
 .../translation/wrappers/SourceInputSplit.java  |  52 --
 .../streaming/FlinkAbstractParDoWrapper.java    | 266 -------
 .../FlinkGroupAlsoByWindowWrapper.java          | 631 ----------------
 .../streaming/FlinkGroupByKeyWrapper.java       |  66 --
 .../streaming/FlinkParDoBoundMultiWrapper.java  |  77 --
 .../streaming/FlinkParDoBoundWrapper.java       | 100 ---
 .../io/FlinkStreamingCreateFunction.java        |  65 --
 .../streaming/io/UnboundedFlinkSource.java      |  82 ---
 .../streaming/io/UnboundedSocketSource.java     | 233 ------
 .../streaming/io/UnboundedSourceWrapper.java    | 134 ----
 .../state/AbstractFlinkTimerInternals.java      | 128 ----
 .../streaming/state/FlinkStateInternals.java    | 715 -------------------
 .../streaming/state/StateCheckpointReader.java  |  91 ---
 .../streaming/state/StateCheckpointUtils.java   | 155 ----
 .../streaming/state/StateCheckpointWriter.java  | 129 ----
 .../wrappers/streaming/state/StateType.java     |  73 --
 .../flink/src/main/resources/log4j.properties   |  23 -
 .../apache/beam/runners/flink/AvroITCase.java   | 127 ----
 .../beam/runners/flink/FlattenizeITCase.java    |  74 --
 .../beam/runners/flink/FlinkTestPipeline.java   |  72 --
 .../beam/runners/flink/JoinExamplesITCase.java  | 101 ---
 .../runners/flink/MaybeEmptyTestITCase.java     |  65 --
 .../runners/flink/ParDoMultiOutputITCase.java   | 100 ---
 .../beam/runners/flink/ReadSourceITCase.java    | 165 -----
 .../flink/RemoveDuplicatesEmptyITCase.java      |  70 --
 .../runners/flink/RemoveDuplicatesITCase.java   |  71 --
 .../beam/runners/flink/SideInputITCase.java     |  69 --
 .../apache/beam/runners/flink/TfIdfITCase.java  |  78 --
 .../beam/runners/flink/WordCountITCase.java     |  76 --
 .../runners/flink/WordCountJoin2ITCase.java     | 138 ----
 .../runners/flink/WordCountJoin3ITCase.java     | 156 ----
 .../beam/runners/flink/WriteSinkITCase.java     | 158 ----
 .../flink/streaming/GroupAlsoByWindowTest.java  | 508 -------------
 .../flink/streaming/GroupByNullKeyTest.java     | 123 ----
 .../flink/streaming/StateSerializationTest.java | 305 --------
 .../streaming/TopWikipediaSessionsITCase.java   | 134 ----
 .../beam/runners/flink/util/JoinExamples.java   | 160 -----
 .../src/test/resources/log4j-test.properties    |  27 -
 runners/pom.xml                                 |  62 +-
 161 files changed, 12480 insertions(+), 12340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/README.md
----------------------------------------------------------------------
diff --git a/runners/flink/README.md b/runners/flink/README.md
index 8d71680..7418f16 100644
--- a/runners/flink/README.md
+++ b/runners/flink/README.md
@@ -84,7 +84,7 @@ Flink-Runner is now installed in your local maven repository.
 ## Executing an example
 
 Next, let's run the classic WordCount example. It's semantically identically to
-the example provided with ApacheBeam. Only this time, we chose the
+the example provided with Apache Beam. Only this time, we chose the
 `FlinkPipelineRunner` to execute the WordCount on top of Flink.
 
 Here's an excerpt from the WordCount class file:
@@ -107,13 +107,14 @@ p.run();
 
 To execute the example, let's first get some sample data:
 
-    curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > kinglear.txt
+    curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > examples/kinglear.txt
 
 Then let's run the included WordCount locally on your machine:
 
+    cd examples
     mvn exec:exec -Dinput=kinglear.txt -Doutput=wordcounts.txt
 
-Congratulations, you have run your first ApacheBeam program on top of Apache Flink!
+Congratulations, you have run your first Apache Beam program on top of Apache Flink!
 
 
 # Running Beam programs on a Flink cluster
@@ -129,53 +130,53 @@ The contents of the root `pom.xml` should be slightly changed aftewards (explana
 ```xml
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <groupId>com.mycompany.beam</groupId>
-    <artifactId>beam-test</artifactId>
-    <version>1.0</version>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.beam</groupId>
-            <artifactId>flink-runner</artifactId>
-            <version>0.3-SNAPSHOT</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <version>2.4.1</version>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>org.apache.beam.runners.flink.examples.WordCount</mainClass>
-                                </transformer>
-                            </transformers>
-                            <artifactSet>
-                                <excludes>
-                                    <exclude>org.apache.flink:*</exclude>
-                                </excludes>
-                            </artifactSet>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
-        </plugins>
-
-    </build>
+     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.mycompany.beam</groupId>
+  <artifactId>beam-test</artifactId>
+  <version>1.0</version>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>flink-runner_2.10</artifactId>
+      <version>0.4-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.4.1</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                  <mainClass>org.apache.beam.runners.flink.examples.WordCount</mainClass>
+                </transformer>
+              </transformers>
+              <artifactSet>
+                <excludes>
+                  <exclude>org.apache.flink:*</exclude>
+                </excludes>
+              </artifactSet>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+
+  </build>
 
 </project>
 ```

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
new file mode 100644
index 0000000..91cc1b7
--- /dev/null
+++ b/runners/flink/examples/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+         xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>flink-runner-parent</artifactId>
+    <version>0.4-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>flink-runner-examples_2.10</artifactId>
+  <version>0.4-SNAPSHOT</version>
+
+  <name>Flink Beam Runner Examples</name>
+  <packaging>jar</packaging>
+
+  <inceptionYear>2015</inceptionYear>
+
+  <licenses>
+    <license>
+      <name>The Apache Software License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+    </license>
+  </licenses>
+
+  <properties>
+    <!-- Default parameters for mvn exec:exec -->
+    <clazz>org.apache.beam.runners.flink.examples.WordCount</clazz>
+    <input>kinglear.txt</input>
+    <output>wordcounts.txt</output>
+    <parallelism>1</parallelism>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>flink-runner_2.10</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2.1</version>
+        <configuration>
+          <executable>java</executable>
+          <arguments>
+            <argument>-classpath</argument>
+            <classpath/>
+            <argument>${clazz}</argument>
+            <argument>--input=${input}</argument>
+            <argument>--output=${output}</argument>
+            <argument>--parallelism=${parallelism}</argument>
+          </arguments>
+        </configuration>
+      </plugin>
+
+    </plugins>
+
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
new file mode 100644
index 0000000..ab23b92
--- /dev/null
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -0,0 +1,452 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.beam.runners.flink.examples;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.GcsOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.Keys;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.transforms.Values;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PDone;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * An example that computes a basic TF-IDF search table for a directory or GCS prefix.
+ *
+ * <p> Concepts: joining data; side inputs; logging
+ *
+ * <p> To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ * }</pre>
+ * and a local output file or output prefix on GCS:
+ * <pre>{@code
+ *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ * }</pre>
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ * and an output prefix on GCS:
+ *   --output=gs://YOUR_OUTPUT_PREFIX
+ * }</pre>
+ *
+ * <p> The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with
+ * {@code --input}.
+ */
+public class TFIDF {
+  /**
+   * Options supported by {@link TFIDF}.
+   * <p>
+   * Inherits standard configuration options.
+   */
+  private interface Options extends PipelineOptions, FlinkPipelineOptions {
+    @Description("Path to the directory or GCS prefix containing files to read from")
+    @Default.String("gs://dataflow-samples/shakespeare/")
+    String getInput();
+    void setInput(String value);
+
+    @Description("Prefix of output URI to write to")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  /**
+   * Lists documents contained beneath the {@code options.input} prefix/directory.
+   */
+  public static Set<URI> listInputDocuments(Options options)
+      throws URISyntaxException, IOException {
+    URI baseUri = new URI(options.getInput());
+
+    // List all documents in the directory or GCS prefix.
+    URI absoluteUri;
+    if (baseUri.getScheme() != null) {
+      absoluteUri = baseUri;
+    } else {
+      absoluteUri = new URI(
+          "file",
+          baseUri.getAuthority(),
+          baseUri.getPath(),
+          baseUri.getQuery(),
+          baseUri.getFragment());
+    }
+
+    Set<URI> uris = new HashSet<>();
+    if (absoluteUri.getScheme().equals("file")) {
+      File directory = new File(absoluteUri);
+      for (String entry : directory.list()) {
+        File path = new File(directory, entry);
+        uris.add(path.toURI());
+      }
+    } else if (absoluteUri.getScheme().equals("gs")) {
+      GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
+      URI gcsUriGlob = new URI(
+          absoluteUri.getScheme(),
+          absoluteUri.getAuthority(),
+          absoluteUri.getPath() + "*",
+          absoluteUri.getQuery(),
+          absoluteUri.getFragment());
+      for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) {
+        uris.add(entry.toUri());
+      }
+    }
+
+    return uris;
+  }
+
+  /**
+   * Reads the documents at the provided uris and returns all lines
+   * from the documents tagged with which document they are from.
+   */
+  public static class ReadDocuments
+      extends PTransform<PInput, PCollection<KV<URI, String>>> {
+    private static final long serialVersionUID = 0;
+
+    private Iterable<URI> uris;
+
+    public ReadDocuments(Iterable<URI> uris) {
+      this.uris = uris;
+    }
+
+    @Override
+    public Coder<?> getDefaultOutputCoder() {
+      return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
+    }
+
+    @Override
+    public PCollection<KV<URI, String>> apply(PInput input) {
+      Pipeline pipeline = input.getPipeline();
+
+      // Create one TextIO.Read transform for each document
+      // and add its output to a PCollectionList
+      PCollectionList<KV<URI, String>> urisToLines =
+          PCollectionList.empty(pipeline);
+
+      // TextIO.Read supports:
+      //  - file: URIs and paths locally
+      //  - gs: URIs on the service
+      for (final URI uri : uris) {
+        String uriString;
+        if (uri.getScheme().equals("file")) {
+          uriString = new File(uri).getPath();
+        } else {
+          uriString = uri.toString();
+        }
+
+        PCollection<KV<URI, String>> oneUriToLines = pipeline
+            .apply(TextIO.Read.from(uriString)
+                .named("TextIO.Read(" + uriString + ")"))
+            .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
+
+        urisToLines = urisToLines.and(oneUriToLines);
+      }
+
+      return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
+    }
+  }
+
+  /**
+   * A transform containing a basic TF-IDF pipeline. The input consists of KV objects
+   * where the key is the document's URI and the value is a piece
+   * of the document's content. The output is mapping from terms to
+   * scores for each document URI.
+   */
+  public static class ComputeTfIdf
+      extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
+    private static final long serialVersionUID = 0;
+
+    public ComputeTfIdf() { }
+
+    @Override
+    public PCollection<KV<String, KV<URI, Double>>> apply(
+        PCollection<KV<URI, String>> uriToContent) {
+
+      // Compute the total number of documents, and
+      // prepare this singleton PCollectionView for
+      // use as a side input.
+      final PCollectionView<Long> totalDocuments =
+          uriToContent
+              .apply("GetURIs", Keys.<URI>create())
+              .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+              .apply(Count.<URI>globally())
+              .apply(View.<Long>asSingleton());
+
+      // Create a collection of pairs mapping a URI to each
+      // of the words in the document associated with that that URI.
+      PCollection<KV<URI, String>> uriToWords = uriToContent
+          .apply(ParDo.named("SplitWords").of(
+              new DoFn<KV<URI, String>, KV<URI, String>>() {
+                private static final long serialVersionUID = 0;
+
+                @Override
+                public void processElement(ProcessContext c) {
+                  URI uri = c.element().getKey();
+                  String line = c.element().getValue();
+                  for (String word : line.split("\\W+")) {
+                    // Log INFO messages when the word “love” is found.
+                    if (word.toLowerCase().equals("love")) {
+                      LOG.info("Found {}", word.toLowerCase());
+                    }
+
+                    if (!word.isEmpty()) {
+                      c.output(KV.of(uri, word.toLowerCase()));
+                    }
+                  }
+                }
+              }));
+
+      // Compute a mapping from each word to the total
+      // number of documents in which it appears.
+      PCollection<KV<String, Long>> wordToDocCount = uriToWords
+          .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
+          .apply(Values.<String>create())
+          .apply("CountDocs", Count.<String>perElement());
+
+      // Compute a mapping from each URI to the total
+      // number of words in the document associated with that URI.
+      PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
+          .apply("GetURIs2", Keys.<URI>create())
+          .apply("CountWords", Count.<URI>perElement());
+
+      // Count, for each (URI, word) pair, the number of
+      // occurrences of that word in the document associated
+      // with the URI.
+      PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
+          .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
+
+      // Adjust the above collection to a mapping from
+      // (URI, word) pairs to counts into an isomorphic mapping
+      // from URI to (word, count) pairs, to prepare for a join
+      // by the URI key.
+      PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
+          .apply(ParDo.named("ShiftKeys").of(
+              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+                private static final long serialVersionUID = 0;
+
+                @Override
+                public void processElement(ProcessContext c) {
+                  URI uri = c.element().getKey().getKey();
+                  String word = c.element().getKey().getValue();
+                  Long occurrences = c.element().getValue();
+                  c.output(KV.of(uri, KV.of(word, occurrences)));
+                }
+              }));
+
+      // Prepare to join the mapping of URI to (word, count) pairs with
+      // the mapping of URI to total word counts, by associating
+      // each of the input PCollection<KV<URI, ...>> with
+      // a tuple tag. Each input must have the same key type, URI
+      // in this case. The type parameter of the tuple tag matches
+      // the types of the values for each collection.
+      final TupleTag<Long> wordTotalsTag = new TupleTag<>();
+      final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>();
+      KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
+          .of(wordTotalsTag, uriToWordTotal)
+          .and(wordCountsTag, uriToWordAndCount);
+
+      // Perform a CoGroupByKey (a sort of pre-join) on the prepared
+      // inputs. This yields a mapping from URI to a CoGbkResult
+      // (CoGroupByKey Result). The CoGbkResult is a mapping
+      // from the above tuple tags to the values in each input
+      // associated with a particular URI. In this case, each
+      // KV<URI, CoGbkResult> group a URI with the total number of
+      // words in that document as well as all the (word, count)
+      // pairs for particular words.
+      PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
+          .apply("CoGroupByUri", CoGroupByKey.<URI>create());
+
+      // Compute a mapping from each word to a (URI, term frequency)
+      // pair for each URI. A word's term frequency for a document
+      // is simply the number of times that word occurs in the document
+      // divided by the total number of words in the document.
+      PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
+          .apply(ParDo.named("ComputeTermFrequencies").of(
+              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+                private static final long serialVersionUID = 0;
+
+                @Override
+                public void processElement(ProcessContext c) {
+                  URI uri = c.element().getKey();
+                  Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
+
+                  for (KV<String, Long> wordAndCount
+                      : c.element().getValue().getAll(wordCountsTag)) {
+                    String word = wordAndCount.getKey();
+                    Long wordCount = wordAndCount.getValue();
+                    Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
+                    c.output(KV.of(word, KV.of(uri, termFrequency)));
+                  }
+                }
+              }));
+
+      // Compute a mapping from each word to its document frequency.
+      // A word's document frequency in a corpus is the number of
+      // documents in which the word appears divided by the total
+      // number of documents in the corpus. Note how the total number of
+      // documents is passed as a side input; the same value is
+      // presented to each invocation of the DoFn.
+      PCollection<KV<String, Double>> wordToDf = wordToDocCount
+          .apply(ParDo
+              .named("ComputeDocFrequencies")
+              .withSideInputs(totalDocuments)
+              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+                private static final long serialVersionUID = 0;
+
+                @Override
+                public void processElement(ProcessContext c) {
+                  String word = c.element().getKey();
+                  Long documentCount = c.element().getValue();
+                  Long documentTotal = c.sideInput(totalDocuments);
+                  Double documentFrequency = documentCount.doubleValue()
+                      / documentTotal.doubleValue();
+
+                  c.output(KV.of(word, documentFrequency));
+                }
+              }));
+
+      // Join the term frequency and document frequency
+      // collections, each keyed on the word.
+      final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>();
+      final TupleTag<Double> dfTag = new TupleTag<>();
+      PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
+          .of(tfTag, wordToUriAndTf)
+          .and(dfTag, wordToDf)
+          .apply(CoGroupByKey.<String>create());
+
+      // Compute a mapping from each word to a (URI, TF-IDF) score
+      // for each URI. There are a variety of definitions of TF-IDF
+      // ("term frequency - inverse document frequency") score;
+      // here we use a basic version that is the term frequency
+      // divided by the log of the document frequency.
+
+      return wordToUriAndTfAndDf
+          .apply(ParDo.named("ComputeTfIdf").of(
+              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+                private static final long serialVersionUID1 = 0;
+
+                @Override
+                public void processElement(ProcessContext c) {
+                  String word = c.element().getKey();
+                  Double df = c.element().getValue().getOnly(dfTag);
+
+                  for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
+                    URI uri = uriAndTf.getKey();
+                    Double tf = uriAndTf.getValue();
+                    Double tfIdf = tf * Math.log(1 / df);
+                    c.output(KV.of(word, KV.of(uri, tfIdf)));
+                  }
+                }
+              }));
+    }
+
+    // Instantiate Logger.
+    // It is suggested that the user specify the class name of the containing class
+    // (in this case ComputeTfIdf).
+    private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
+  }
+
+  /**
+   * A {@link PTransform} to write, in CSV format, a mapping from term and URI
+   * to score.
+   */
+  public static class WriteTfIdf
+      extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> {
+    private static final long serialVersionUID = 0;
+
+    private String output;
+
+    public WriteTfIdf(String output) {
+      this.output = output;
+    }
+
+    @Override
+    public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
+      return wordToUriAndTfIdf
+          .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+            private static final long serialVersionUID = 0;
+
+            @Override
+            public void processElement(ProcessContext c) {
+              c.output(String.format("%s,\t%s,\t%f",
+                  c.element().getKey(),
+                  c.element().getValue().getKey(),
+                  c.element().getValue().getValue()));
+            }
+          }))
+          .apply(TextIO.Write
+              .to(output)
+              .withSuffix(".csv"));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+    options.setRunner(FlinkPipelineRunner.class);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+
+    pipeline
+        .apply(new ReadDocuments(listInputDocuments(options)))
+        .apply(new ComputeTfIdf())
+        .apply(new WriteTfIdf(options.getOutput()));
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
new file mode 100644
index 0000000..7d12fed
--- /dev/null
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.examples;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+public class WordCount {
+
+  public static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  public static class CountWords extends PTransform<PCollection<String>,
+                    PCollection<KV<String, Long>>> {
+    @Override
+    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+
+      // Convert lines of text into individual words.
+      PCollection<String> words = lines.apply(
+          ParDo.of(new ExtractWordsFn()));
+
+      // Count the number of times each word occurs.
+      PCollection<KV<String, Long>> wordCounts =
+          words.apply(Count.<String>perElement());
+
+      return wordCounts;
+    }
+  }
+
+  /** A SimpleFunction that converts a Word and Count into a printable string. */
+  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
+    @Override
+    public String apply(KV<String, Long> input) {
+      return input.getKey() + ": " + input.getValue();
+    }
+  }
+
+  /**
+   * Options supported by {@link WordCount}.
+   * <p>
+   * Inherits standard configuration options.
+   */
+  public interface Options extends PipelineOptions, FlinkPipelineOptions {
+    @Description("Path of the file to read from")
+    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+    String getInput();
+    void setInput(String value);
+
+    @Description("Path of the file to write to")
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args) {
+
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
+        .as(Options.class);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+        .apply(new CountWords())
+        .apply(MapElements.via(new FormatAsTextFn()))
+        .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+
+    p.run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
new file mode 100644
index 0000000..8168122
--- /dev/null
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -0,0 +1,387 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.beam.runners.flink.examples.streaming;
+
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.*;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * To run the example, first open a socket on a terminal by executing the command:
+ * <li>
+ *     <li>
+ *     <code>nc -lk 9999</code>
+ *     </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class AutoComplete {
+
+  /**
+   * A PTransform that takes as input a list of tokens and returns
+   * the most common tokens per prefix.
+   */
+  public static class ComputeTopCompletions
+      extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> {
+    private static final long serialVersionUID = 0;
+
+    private final int candidatesPerPrefix;
+    private final boolean recursive;
+
+    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.recursive = recursive;
+    }
+
+    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) {
+      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+    }
+
+    @Override
+    public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+      PCollection<CompletionCandidate> candidates = input
+        // First count how often each token appears.
+        .apply(new Count.PerElement<String>())
+
+        // Map the KV outputs of Count into our own CompletionCandiate class.
+        .apply(ParDo.named("CreateCompletionCandidates").of(
+            new DoFn<KV<String, Long>, CompletionCandidate>() {
+              private static final long serialVersionUID = 0;
+
+              @Override
+              public void processElement(ProcessContext c) {
+                CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
+                c.output(cand);
+              }
+            }));
+
+      // Compute the top via either a flat or recursive algorithm.
+      if (recursive) {
+        return candidates
+          .apply(new ComputeTopRecursive(candidatesPerPrefix, 1))
+          .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
+      } else {
+        return candidates
+          .apply(new ComputeTopFlat(candidatesPerPrefix, 1));
+      }
+    }
+  }
+
+  /**
+   * Lower latency, but more expensive.
+   */
+  private static class ComputeTopFlat
+      extends PTransform<PCollection<CompletionCandidate>,
+                         PCollection<KV<String, List<CompletionCandidate>>>> {
+    private static final long serialVersionUID = 0;
+
+    private final int candidatesPerPrefix;
+    private final int minPrefix;
+
+    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.minPrefix = minPrefix;
+    }
+
+    @Override
+    public PCollection<KV<String, List<CompletionCandidate>>> apply(
+        PCollection<CompletionCandidate> input) {
+      return input
+        // For each completion candidate, map it to all prefixes.
+        .apply(ParDo.of(new AllPrefixes(minPrefix)))
+
+        // Find and return the top candiates for each prefix.
+        .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)
+             .withHotKeyFanout(new HotKeyFanout()));
+    }
+
+    private static class HotKeyFanout implements SerializableFunction<String, Integer> {
+      private static final long serialVersionUID = 0;
+
+      @Override
+      public Integer apply(String input) {
+        return (int) Math.pow(4, 5 - input.length());
+      }
+    }
+  }
+
+  /**
+   * Cheaper but higher latency.
+   *
+   * <p> Returns two PCollections, the first is top prefixes of size greater
+   * than minPrefix, and the second is top prefixes of size exactly
+   * minPrefix.
+   */
+  private static class ComputeTopRecursive
+      extends PTransform<PCollection<CompletionCandidate>,
+                         PCollectionList<KV<String, List<CompletionCandidate>>>> {
+    private static final long serialVersionUID = 0;
+
+    private final int candidatesPerPrefix;
+    private final int minPrefix;
+
+    public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.minPrefix = minPrefix;
+    }
+
+    private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> {
+      private static final long serialVersionUID = 0;
+
+      @Override
+      public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) {
+        return elem.getKey().length() > minPrefix ? 0 : 1;
+      }
+    }
+
+    private static class FlattenTops
+        extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+      private static final long serialVersionUID = 0;
+
+      @Override
+      public void processElement(ProcessContext c) {
+        for (CompletionCandidate cc : c.element().getValue()) {
+          c.output(cc);
+        }
+      }
+    }
+
+    @Override
+    public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+          PCollection<CompletionCandidate> input) {
+        if (minPrefix > 10) {
+          // Base case, partitioning to return the output in the expected format.
+          return input
+            .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix))
+            .apply(Partition.of(2, new KeySizePartitionFn()));
+        } else {
+          // If a candidate is in the top N for prefix a...b, it must also be in the top
+          // N for a...bX for every X, which is typlically a much smaller set to consider.
+          // First, compute the top candidate for prefixes of size at least minPrefix + 1.
+          PCollectionList<KV<String, List<CompletionCandidate>>> larger = input
+            .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1));
+          // Consider the top candidates for each prefix of length minPrefix + 1...
+          PCollection<KV<String, List<CompletionCandidate>>> small =
+            PCollectionList
+            .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
+            // ...together with those (previously excluded) candidates of length
+            // exactly minPrefix...
+            .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() {
+                    private static final long serialVersionUID = 0;
+
+                    @Override
+                    public Boolean apply(CompletionCandidate c) {
+                      return c.getValue().length() == minPrefix;
+                    }
+                  })))
+            .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
+            // ...set the key to be the minPrefix-length prefix...
+            .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
+            // ...and (re)apply the Top operator to all of them together.
+            .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix));
+
+          PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger
+              .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
+
+          return PCollectionList.of(flattenLarger).and(small);
+        }
+    }
+  }
+
+  /**
+   * A DoFn that keys each candidate by all its prefixes.
+   */
+  private static class AllPrefixes
+      extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+    private static final long serialVersionUID = 0;
+
+    private final int minPrefix;
+    private final int maxPrefix;
+    public AllPrefixes(int minPrefix) {
+      this(minPrefix, Integer.MAX_VALUE);
+    }
+    public AllPrefixes(int minPrefix, int maxPrefix) {
+      this.minPrefix = minPrefix;
+      this.maxPrefix = maxPrefix;
+    }
+    @Override
+      public void processElement(ProcessContext c) {
+      String word = c.element().value;
+      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+        KV<String, CompletionCandidate> kv = KV.of(word.substring(0, i), c.element());
+        c.output(kv);
+      }
+    }
+  }
+
+  /**
+   * Class used to store tag-count pairs.
+   */
+  @DefaultCoder(AvroCoder.class)
+  static class CompletionCandidate implements Comparable<CompletionCandidate> {
+    private long count;
+    private String value;
+
+    public CompletionCandidate(String value, long count) {
+      this.value = value;
+      this.count = count;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    // Empty constructor required for Avro decoding.
+    @SuppressWarnings("unused")
+    public CompletionCandidate() {}
+
+    @Override
+    public int compareTo(CompletionCandidate o) {
+      if (this.count < o.count) {
+        return -1;
+      } else if (this.count == o.count) {
+        return this.value.compareTo(o.value);
+      } else {
+        return 1;
+      }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof CompletionCandidate) {
+        CompletionCandidate that = (CompletionCandidate) other;
+        return this.count == that.count && this.value.equals(that.value);
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return Long.valueOf(count).hashCode() ^ value.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "CompletionCandidate[" + value + ", " + count + "]";
+    }
+  }
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+            createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  /**
+   * Takes as input a the top candidates per prefix, and emits an entity
+   * suitable for writing to Datastore.
+   */
+  static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String>
+          implements DoFn.RequiresWindowAccess{
+
+    private static final long serialVersionUID = 0;
+
+    @Override
+    public void processElement(ProcessContext c) {
+      StringBuilder str = new StringBuilder();
+      KV<String, List<CompletionCandidate>> elem = c.element();
+
+      str.append(elem.getKey() +" @ "+ c.window() +" -> ");
+      for(CompletionCandidate cand: elem.getValue()) {
+        str.append(cand.toString() + " ");
+      }
+      System.out.println(str.toString());
+      c.output(str.toString());
+    }
+  }
+
+  /**
+   * Options supported by this class.
+   *
+   * <p> Inherits standard Dataflow configuration options.
+   */
+  private interface Options extends WindowedWordCount.StreamingWordCountOptions {
+    @Description("Whether to use the recursive algorithm")
+    @Default.Boolean(true)
+    Boolean getRecursive();
+    void setRecursive(Boolean value);
+  }
+
+  public static void main(String[] args) throws IOException {
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    options.setStreaming(true);
+    options.setCheckpointingInterval(1000L);
+    options.setNumberOfExecutionRetries(5);
+    options.setExecutionRetryDelay(3000L);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    PTransform<? super PBegin, PCollection<String>> readSource =
+            Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream");
+    WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+    // Create the pipeline.
+    Pipeline p = Pipeline.create(options);
+    PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
+      .apply(readSource)
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .apply(Window.<String>into(windowFn)
+              .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes())
+      .apply(ComputeTopCompletions.top(10, options.getRecursive()));
+
+    toWrite
+      .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile()))
+      .apply(TextIO.Write.to("./outputAutoComplete.txt"));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
new file mode 100644
index 0000000..3a8bdb0
--- /dev/null
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.beam.runners.flink.examples.streaming;
+
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.joda.time.Duration;
+
+/**
+ * To run the example, first open two sockets on two terminals by executing the commands:
+ * <li>
+ *     <li>
+ *         <code>nc -lk 9999</code>, and
+ *     </li>
+ *     <li>
+ *         <code>nc -lk 9998</code>
+ *     </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class JoinExamples {
+
+  static PCollection<String> joinEvents(PCollection<String> streamA,
+                      PCollection<String> streamB) throws Exception {
+
+    final TupleTag<String> firstInfoTag = new TupleTag<>();
+    final TupleTag<String> secondInfoTag = new TupleTag<>();
+
+    // transform both input collections to tuple collections, where the keys are country
+    // codes in both cases.
+    PCollection<KV<String, String>> firstInfo = streamA.apply(
+        ParDo.of(new ExtractEventDataFn()));
+    PCollection<KV<String, String>> secondInfo = streamB.apply(
+        ParDo.of(new ExtractEventDataFn()));
+
+    // country code 'key' -> CGBKR (<event info>, <country name>)
+    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
+        .of(firstInfoTag, firstInfo)
+        .and(secondInfoTag, secondInfo)
+        .apply(CoGroupByKey.<String>create());
+
+    // Process the CoGbkResult elements generated by the CoGroupByKey transform.
+    // country code 'key' -> string of <event info>, <country name>
+    PCollection<KV<String, String>> finalResultCollection =
+        kvpCollection.apply(ParDo.named("Process").of(
+            new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+              private static final long serialVersionUID = 0;
+
+              @Override
+              public void processElement(ProcessContext c) {
+                KV<String, CoGbkResult> e = c.element();
+                String key = e.getKey();
+
+                String defaultA = "NO_VALUE";
+
+                // the following getOnly is a bit tricky because it expects to have
+                // EXACTLY ONE value in the corresponding stream and for the corresponding key.
+
+                String lineA = e.getValue().getOnly(firstInfoTag, defaultA);
+                for (String lineB : c.element().getValue().getAll(secondInfoTag)) {
+                  // Generate a string that combines information from both collection values
+                  c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB));
+                }
+              }
+            }));
+
+    return finalResultCollection
+        .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
+          private static final long serialVersionUID = 0;
+
+          @Override
+          public void processElement(ProcessContext c) {
+            String result = c.element().getKey() + " -> " + c.element().getValue();
+            System.out.println(result);
+            c.output(result);
+          }
+        }));
+  }
+
+  static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
+    private static final long serialVersionUID = 0;
+
+    @Override
+    public void processElement(ProcessContext c) {
+      String line = c.element().toLowerCase();
+      String key = line.split("\\s")[0];
+      c.output(KV.of(key, line));
+    }
+  }
+
+  private interface Options extends WindowedWordCount.StreamingWordCountOptions {
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    options.setStreaming(true);
+    options.setCheckpointingInterval(1000L);
+    options.setNumberOfExecutionRetries(5);
+    options.setExecutionRetryDelay(3000L);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    PTransform<? super PBegin, PCollection<String>> readSourceA =
+        Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
+    PTransform<? super PBegin, PCollection<String>> readSourceB =
+        Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
+
+    WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+    Pipeline p = Pipeline.create(options);
+
+    // the following two 'applys' create multiple inputs to our pipeline, one for each
+    // of our two input sources.
+    PCollection<String> streamA = p.apply(readSourceA)
+        .apply(Window.<String>into(windowFn)
+            .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes());
+    PCollection<String> streamB = p.apply(readSourceB)
+        .apply(Window.<String>into(windowFn)
+            .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes());
+
+    PCollection<String> formattedResults = joinEvents(streamA, streamB);
+    formattedResults.apply(TextIO.Write.to("./outputJoin.txt"));
+    p.run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
new file mode 100644
index 0000000..55cdc22
--- /dev/null
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.examples.streaming;
+
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.joda.time.Duration;
+
+import java.util.Properties;
+
+public class KafkaWindowedWordCountExample {
+
+  static final String KAFKA_TOPIC = "test";  // Default kafka topic to read from
+  static final String KAFKA_BROKER = "localhost:9092";  // Default kafka broker to contact
+  static final String GROUP_ID = "myGroup";  // Default groupId
+  static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper to connect to for Kafka
+
+  public static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+      System.out.println(row);
+      c.output(row);
+    }
+  }
+
+  public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions {
+    @Description("The Kafka topic to read from")
+    @Default.String(KAFKA_TOPIC)
+    String getKafkaTopic();
+
+    void setKafkaTopic(String value);
+
+    @Description("The Kafka Broker to read from")
+    @Default.String(KAFKA_BROKER)
+    String getBroker();
+
+    void setBroker(String value);
+
+    @Description("The Zookeeper server to connect to")
+    @Default.String(ZOOKEEPER)
+    String getZookeeper();
+
+    void setZookeeper(String value);
+
+    @Description("The groupId")
+    @Default.String(GROUP_ID)
+    String getGroup();
+
+    void setGroup(String value);
+
+  }
+
+  public static void main(String[] args) {
+    PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
+    KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
+    options.setJobName("KafkaExample");
+    options.setStreaming(true);
+    options.setCheckpointingInterval(1000L);
+    options.setNumberOfExecutionRetries(5);
+    options.setExecutionRetryDelay(3000L);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
+    Pipeline pipeline = Pipeline.create(options);
+
+    Properties p = new Properties();
+    p.setProperty("zookeeper.connect", options.getZookeeper());
+    p.setProperty("bootstrap.servers", options.getBroker());
+    p.setProperty("group.id", options.getGroup());
+
+    // this is the Flink consumer that reads the input to
+    // the program from a kafka topic.
+    FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>(
+        options.getKafkaTopic(),
+        new SimpleStringSchema(), p);
+
+    PCollection<String> words = pipeline
+        .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount"))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
+            .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes());
+
+    PCollection<KV<String, Long>> wordCounts =
+        words.apply(Count.<String>perElement());
+
+    wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+        .apply(TextIO.Write.to("./outputKafka.txt"));
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
new file mode 100644
index 0000000..7eb69ba
--- /dev/null
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.examples.streaming;
+
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.*;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * To run the example, first open a socket on a terminal by executing the command:
+ * <li>
+ *     <li>
+ *     <code>nc -lk 9999</code>
+ *     </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class WindowedWordCount {
+
+  private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
+
+  static final long WINDOW_SIZE = 10;  // Default window duration in seconds
+  static final long SLIDE_SIZE = 5;  // Default window slide in seconds
+
+  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+      c.output(row);
+    }
+  }
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  public interface StreamingWordCountOptions extends org.apache.beam.runners.flink.examples.WordCount.Options {
+    @Description("Sliding window duration, in seconds")
+    @Default.Long(WINDOW_SIZE)
+    Long getWindowSize();
+
+    void setWindowSize(Long value);
+
+    @Description("Window slide, in seconds")
+    @Default.Long(SLIDE_SIZE)
+    Long getSlide();
+
+    void setSlide(Long value);
+  }
+
+  public static void main(String[] args) throws IOException {
+    StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class);
+    options.setStreaming(true);
+    options.setWindowSize(10L);
+    options.setSlide(5L);
+    options.setCheckpointingInterval(1000L);
+    options.setNumberOfExecutionRetries(5);
+    options.setExecutionRetryDelay(3000L);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() +
+        " sec. and a slide of " + options.getSlide());
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    PCollection<String> words = pipeline
+        .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount"))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
+            .every(Duration.standardSeconds(options.getSlide())))
+            .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes());
+
+    PCollection<KV<String, Long>> wordCounts =
+        words.apply(Count.<String>perElement());
+
+    wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+        .apply(TextIO.Write.to("./outputWordCount.txt"));
+
+    pipeline.run();
+  }
+}


Mime
View raw message