beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [26/39] incubator-beam git commit: BEAM-844 Add README, wordcount test, fix View.AsIterable, initial GBK watermark.
Date Sat, 12 Nov 2016 02:28:44 GMT
BEAM-844 Add README, wordcount test, fix View.AsIterable, initial GBK watermark.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a21550f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a21550f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a21550f7

Branch: refs/heads/master
Commit: a21550f7ae3004e460ca6dfb33102a9fb191356c
Parents: 51af7e5
Author: Thomas Weise <thw@apache.org>
Authored: Thu Nov 3 05:40:46 2016 +0100
Committer: Thomas Weise <thw@apache.org>
Committed: Fri Nov 4 23:01:17 2016 +0100

----------------------------------------------------------------------
 runners/apex/README.md                          |  76 ++++++++
 runners/apex/pom.xml                            |   1 +
 .../apache/beam/runners/apex/ApexRunner.java    |  82 ++++++++
 .../functions/ApexGroupByKeyOperator.java       |   2 +-
 .../io/ApexReadUnboundedInputOperator.java      |   2 +-
 .../apex/translators/utils/ApexStreamTuple.java |  21 ++-
 .../apex/examples/StreamingWordCountTest.java   | 121 ------------
 .../runners/apex/examples/WordCountTest.java    | 188 +++++++++++++++++++
 .../translators/ApexGroupByKeyOperatorTest.java | 112 +++++++++++
 .../translators/GroupByKeyTranslatorTest.java   |   1 +
 runners/apex/src/test/resources/words.txt       |   3 +
 11 files changed, 484 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/README.md
----------------------------------------------------------------------
diff --git a/runners/apex/README.md b/runners/apex/README.md
new file mode 100644
index 0000000..c9e47a1
--- /dev/null
+++ b/runners/apex/README.md
@@ -0,0 +1,76 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+Apex Beam Runner ﴾Apex‐Runner﴿
+=============================
+
+Apex‐Runner is a Runner for Apache Beam which executes Beam pipelines with Apache Apex
as underlying engine. The runner has broad support for the Beam model and supports streaming
and batch pipelines. 
+
+[Apache Apex](http://apex.apache.org/) is a stream processing platform and framework for
low-latency, high-throughput and fault-tolerant analytics applications on Apache Hadoop. Apex
is Java based and also provides its own API for application development (native compositional
and declarative Java API, SQL) with a comprehensive [operator library](https://github.com/apache/apex-malhar).
Apex has a unified streaming architecture and can be used for real-time and batch processing.
With its stateful stream processing architecture Apex can support all of the concepts in the
Beam model (event time, triggers, watermarks etc.).
+
+##Status
+
+Apex-Runner is relatively new. It is fully functional and can currently be used to run pipelines
in embedded mode. It does not take advantage of all the performance and scalability that Apex
can deliver. This is expected to be addressed with upcoming work, leveraging features like
incremental checkpointing, partitioning and operator affinity from Apex. Please see [JIRA](https://issues.apache.org/jira/issues/?jql=project%20%3D%20BEAM%20AND%20component%20%3D%20runner-apex%20AND%20resolution%20%3D%20Unresolved)
and we welcome contributions!
+
+##Getting Started
+
+The following shows how to run the WordCount example that is provided with the source code
on Apex (the example is identical with the one provided as part of the Beam examples). 
+
+###Installing Beam
+
+To get the latest version of Beam with Apex-Runner, first clone the Beam repository:
+
+```
+git clone https://github.com/apache/incubator‐beam
+```
+
+Then switch to the newly created directory and run Maven to build the Apache Beam:
+
+```
+cd incubator‐beam
+mvn clean install ‐DskipTests
+```
+
+Now Apache Beam and the Apex Runner are installed in your local Maven repository.
+
+###Running an Example
+
+Download something to count:
+
+```
+curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt
+```
+
+Run the pipeline, using the Apex runner:
+
+```
+cd examples/java
+mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=/tmp/kinglear.txt
--output=/tmp/wordcounts.txt --runner=ApexRunner" -Pinclude-runners
+```
+
+Once completed, there will be multiple output files with the base name given above:
+
+```
+$ ls /tmp/out-*
+/tmp/out-00000-of-00003  /tmp/out-00001-of-00003  /tmp/out-00002-of-00003
+```
+
+##Running pipelines on an Apex YARN cluster
+
+Coming soon.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index d4bcc3d..1ca61b9 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -215,6 +215,7 @@
               <ignoredUsedUndeclaredDependencies>
                 <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.5.0-SNAPSHOT</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:2.1</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::2.24.0</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.2.1</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 416e99c..661308d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -25,12 +25,16 @@ import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.base.Throwables;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.beam.runners.apex.translators.TranslationContext;
 import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -104,6 +108,10 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult>
{
       PTransform<InputT, OutputT> customTransform = (PTransform)
           new StreamingViewAsSingleton<InputT>(this, (View.AsSingleton) transform);
       return Pipeline.applyTransform(input, customTransform);
+    } else if (View.AsIterable.class.equals(transform.getClass())) {
+      PTransform<InputT, OutputT> customTransform = (PTransform)
+          new StreamingViewAsIterable<InputT>(this, (View.AsIterable) transform);
+      return Pipeline.applyTransform(input, customTransform);
     } else {
       return super.apply(transform, input);
     }
@@ -317,4 +325,78 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult>
{
     }
   }
 
+  private static class StreamingViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>>
{
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    public StreamingViewAsIterable(ApexRunner runner, View.AsIterable<T> transform)
{
+    }
+
+    @Override
+    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(input.getPipeline(),
+          input.getWindowingStrategy(), input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateApexPCollectionView.<T, Iterable<T>> of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all
inputs.
+   *
+   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>,
List<T>> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T>
inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T>
inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
index d69aeab..98f3eca 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -103,7 +103,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   private transient ProcessContext context;
   private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>
fn;
   private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
-  private Instant inputWatermark = new Instant(0);
+  private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
   public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K,
V>>>> input =
       new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>()
{

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
index 0e2b0c2..61236ca 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -140,7 +140,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
         Instant timestamp = reader.getCurrentTimestamp();
         available = reader.advance();
         if (traceTuples) {
-          LOG.debug("\nemitting {}\n", data);
+          LOG.debug("\nemitting '{}' timestamp {}\n", data, timestamp);
         }
         output.emit(DataTuple.of(WindowedValue.of(
             data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index 7f8b0fa..25518dc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -28,6 +28,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
@@ -109,6 +110,22 @@ public interface ApexStreamTuple<T> {
     public void setTimestamp(long timestamp) {
       this.timestamp = timestamp;
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(timestamp);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof TimestampedTuple)) {
+        return false;
+      } else {
+        TimestampedTuple<?> other = (TimestampedTuple<?>) obj;
+        return (timestamp == other.timestamp) && Objects.equals(this.getValue(),
other.getValue());
+      }
+    }
+
   }
 
   /**
@@ -164,10 +181,10 @@ public interface ApexStreamTuple<T> {
         throws CoderException, IOException {
       int b = inStream.read();
       if (b == 1) {
-        return new WatermarkTuple<T>(new DataInputStream(inStream).readLong());
+        return new WatermarkTuple<>(new DataInputStream(inStream).readLong());
       } else {
         int unionTag = inStream.read();
-        return new DataTuple<T>(valueCoder.decode(inStream, context), unionTag);
+        return new DataTuple<>(valueCoder.decode(inStream, context), unionTag);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
deleted file mode 100644
index 363e669..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.examples;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.runners.apex.ApexRunnerResult;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Windowed word count example on Apex runner.
- */
-public class StreamingWordCountTest {
-
-  static class ExtractWordsFn extends DoFn<String, String> {
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
-      }
-
-      // Split the line into words.
-      String[] words = c.element().split("[^a-zA-Z']+");
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(word);
-        }
-      }
-    }
-  }
-
-  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
-    private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class);
-    static final ConcurrentHashMap<String, Long> RESULTS = new ConcurrentHashMap<>();
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      String row = c.element().getKey() + " - " + c.element().getValue()
-          + " @ " + c.timestamp().toString();
-      LOG.debug("output {}", row);
-      c.output(row);
-      RESULTS.put(c.element().getKey(), c.element().getValue());
-    }
-  }
-
-  @Test
-  public void testWindowedWordCount() throws Exception {
-    String[] args = new String[] {
-        "--runner=" + ApexRunner.class.getName()
-    };
-    ApexPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
-        .as(ApexPipelineOptions.class);
-    options.setApplicationName("StreamingWordCount");
-    Pipeline p = Pipeline.create(options);
-
-    PCollection<KV<String, Long>> wordCounts =
-        p.apply(Read.from(new UnboundedTextSource()))
-            .apply(ParDo.of(new ExtractWordsFn()))
-            .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
-            .apply(Count.<String>perElement());
-
-    wordCounts.apply(ParDo.of(new FormatAsStringFn()));
-
-    ApexRunnerResult result = (ApexRunnerResult) p.run();
-    Assert.assertNotNull(result.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)"));
-    long timeout = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < timeout) {
-      if (FormatAsStringFn.RESULTS.containsKey("foo")
-          && FormatAsStringFn.RESULTS.containsKey("bar")) {
-        break;
-      }
-      result.waitUntilFinish(Duration.millis(1000));
-    }
-    result.cancel();
-    Assert.assertTrue(
-        FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar"));
-    FormatAsStringFn.RESULTS.clear();
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
new file mode 100644
index 0000000..28bb8ad
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.examples;
+
+import com.google.common.collect.Sets;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.TestApexRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.io.FileUtils;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Windowed word count example on Apex runner.
+ */
+public class WordCountTest {
+
+  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    private static final long serialVersionUID = 1L;
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      String row = c.element().getKey() + " - " + c.element().getValue()
+          + " @ " + c.timestamp().toString();
+      c.output(row);
+    }
+  }
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private static final long serialVersionUID = 1L;
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  /**
+   * Options for word count example.
+   */
+  public interface WordCountOptions extends ApexPipelineOptions {
+    @Description("Path of the file to read from")
+    @Validation.Required
+    String getInputFile();
+    void setInputFile(String value);
+
+    @Description("Path of the file to write to")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args) {
+    WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+      .as(WordCountOptions.class);
+    Pipeline p = Pipeline.create(options);
+    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .apply(Count.<String>perElement())
+      .apply(ParDo.of(new FormatAsStringFn()))
+      .apply("WriteCounts", TextIO.Write.to(options.getOutput()))
+      ;
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testWordCountExample() throws Exception {
+    PipelineOptionsFactory.register(WordCountOptions.class);
+    WordCountOptions options = TestPipeline.testingPipelineOptions().as(WordCountOptions.class);
+    options.setRunner(TestApexRunner.class);
+    options.setApplicationName("StreamingWordCount");
+    String inputFile = WordCountTest.class.getResource("/words.txt").getFile();
+    options.setInputFile(new File(inputFile).getAbsolutePath());
+    String outputFilePrefix = "target/wordcountresult.txt";
+    options.setOutput(outputFilePrefix);
+    WordCountTest.main(TestPipeline.convertToArgs(options));
+
+    File outFile1 = new File(outputFilePrefix + "-00000-of-00002");
+    File outFile2 = new File(outputFilePrefix + "-00001-of-00002");
+    Assert.assertTrue(outFile1.exists() && outFile2.exists());
+    HashSet<String> results = new HashSet<>();
+    results.addAll(FileUtils.readLines(outFile1));
+    results.addAll(FileUtils.readLines(outFile2));
+    HashSet<String> expectedOutput = Sets.newHashSet(
+        "foo - 5 @ 294247-01-09T04:00:54.775Z",
+        "bar - 5 @ 294247-01-09T04:00:54.775Z"
+    );
+    Assert.assertEquals("expected output", expectedOutput, results);
+  }
+
+  static class CollectResultsFn extends DoFn<KV<String, Long>, String> {
+    static final ConcurrentHashMap<String, Long> RESULTS = new ConcurrentHashMap<>();
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      RESULTS.put(c.element().getKey(), c.element().getValue());
+    }
+  }
+
+  @Test
+  public void testWindowedWordCount() throws Exception {
+    String[] args = new String[] {
+        "--runner=" + ApexRunner.class.getName()
+    };
+    ApexPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+        .as(ApexPipelineOptions.class);
+    options.setApplicationName("StreamingWordCount");
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<KV<String, Long>> wordCounts =
+        p.apply(Read.from(new UnboundedTextSource()))
+            .apply(ParDo.of(new ExtractWordsFn()))
+            .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
+            .apply(Count.<String>perElement());
+
+    wordCounts.apply(ParDo.of(new CollectResultsFn()));
+
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
+    Assert.assertNotNull(result.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)"));
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (CollectResultsFn.RESULTS.containsKey("foo")
+          && CollectResultsFn.RESULTS.containsKey("bar")) {
+        break;
+      }
+      result.waitUntilFinish(Duration.millis(1000));
+    }
+    result.cancel();
+    Assert.assertTrue(
+        CollectResultsFn.RESULTS.containsKey("foo") && CollectResultsFn.RESULTS.containsKey("bar"));
+    CollectResultsFn.RESULTS.clear();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
new file mode 100644
index 0000000..3e8d575
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators;
+
+import com.datatorrent.api.Sink;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.TestApexRunner;
+import org.apache.beam.runners.apex.translators.functions.ApexGroupByKeyOperator;
+import org.apache.beam.runners.apex.translators.utils.ApexStateInternals;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link ApexGroupByKeyOperator}.
+ */
+public class ApexGroupByKeyOperatorTest {
+
+  @Test
+  public void testGlobalWindowMinTimestamp() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    options.setRunner(TestApexRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    WindowingStrategy<?, ?> ws = WindowingStrategy.of(FixedWindows.of(
+        Duration.standardSeconds(10)));
+    PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal(pipeline,
+        ws, IsBounded.BOUNDED);
+    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+    ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options,
+        input, new ApexStateInternals.ApexStateInternalsFactory<String>()
+        );
+
+    final List<Object> results = Lists.newArrayList();
+    Sink<Object> sink =  new Sink<Object>() {
+      @Override
+      public void put(Object tuple) {
+        results.add(tuple);
+      }
+      @Override
+      public int getCount(boolean reset) {
+        return 0;
+      }
+    };
+    operator.output.setSink(sink);
+    operator.setup(null);
+    operator.beginWindow(1);
+
+    Instant windowStart = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    BoundedWindow window = new IntervalWindow(windowStart, windowStart.plus(10000));
+    PaneInfo paneInfo = PaneInfo.NO_FIRING;
+
+    WindowedValue<KV<String, Integer>> wv1 =
+        WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo);
+    operator.input.process(ApexStreamTuple.DataTuple.of(wv1));
+
+    WindowedValue<KV<String, Integer>> wv2 =
+        WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo);
+    operator.input.process(ApexStreamTuple.DataTuple.of(wv2));
+
+    ApexStreamTuple<WindowedValue<KV<String, Integer>>> watermark =
+        ApexStreamTuple.WatermarkTuple.of(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+    Assert.assertEquals("number outputs", 0, results.size());
+    operator.input.process(watermark);
+    Assert.assertEquals("number outputs", 2, results.size());
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    ApexStreamTuple.DataTuple<WindowedValue<KV<String, Iterable<Integer>>>>
dataTuple =
+        (ApexStreamTuple.DataTuple) results.get(0);
+    List<Integer> counts = Lists.newArrayList(1, 1);
+    Assert.assertEquals("iterable", KV.of("foo", counts), dataTuple.getValue().getValue());
+    Assert.assertEquals("expected watermark", watermark, results.get(1));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
index cb764d6..e67e29e 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
@@ -242,4 +242,5 @@ public class GroupByKeyTranslatorTest {
       }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/test/resources/words.txt
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/resources/words.txt b/runners/apex/src/test/resources/words.txt
new file mode 100644
index 0000000..94151ee
--- /dev/null
+++ b/runners/apex/src/test/resources/words.txt
@@ -0,0 +1,3 @@
+
+foo foo foo bar bar
+foo foo bar bar bar


Mime
View raw message