beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [44/50] [abbrv] incubator-beam git commit: fix import order
Date Tue, 13 Sep 2016 00:41:15 GMT
fix import order


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59ae94c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59ae94c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59ae94c5

Branch: refs/heads/gearpump-runner
Commit: 59ae94c59931732d5cf78c5431147d580f9ff747
Parents: 6cd48c4
Author: manuzhang <owenzhang1990@gmail.com>
Authored: Mon Sep 12 11:45:15 2016 +0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Sep 12 17:40:14 2016 -0700

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        |  7 ++---
 .../gearpump/GearpumpPipelineOptions.java       |  8 +++---
 .../gearpump/GearpumpPipelineResult.java        |  4 ++-
 .../gearpump/GearpumpPipelineRunner.java        | 28 ++++++++++----------
 .../GearpumpPipelineRunnerRegistrar.java        |  9 +++----
 .../gearpump/GearpumpPipelineTranslator.java    |  7 +++--
 .../gearpump/examples/StreamingWordCount.java   | 15 +++--------
 .../gearpump/examples/UnboundedTextSource.java  | 15 ++++++-----
 .../translators/GroupByKeyTranslator.java       | 14 +++++-----
 .../translators/ParDoBoundMultiTranslator.java  | 16 +++++------
 .../translators/TransformTranslator.java        |  3 +--
 .../translators/TranslationContext.java         |  6 ++---
 .../translators/functions/DoFnFunction.java     | 14 +++++-----
 .../translators/io/BoundedSourceWrapper.java    |  4 +--
 .../gearpump/translators/io/GearpumpSource.java | 12 +++++----
 .../translators/io/UnboundedSourceWrapper.java  |  4 +--
 .../gearpump/translators/io/ValuesSource.java   | 12 ++++-----
 .../translators/utils/GearpumpDoFnRunner.java   | 28 ++++++++++----------
 .../translators/utils/NoOpSideInputReader.java  |  8 +++---
 .../translators/utils/NoOpStepContext.java      |  6 ++---
 .../main/java/org/apache/beam/sdk/Pipeline.java |  2 +-
 .../apache/beam/sdk/runners/PipelineRunner.java |  1 +
 .../beam/sdk/transforms/DoFnAdapters.java       |  5 ++++
 23 files changed, 114 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index cc99a7a..296de6b 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -122,7 +122,6 @@
       <groupId>org.apache.gearpump</groupId>
       <artifactId>gearpump-daemon_2.11</artifactId>
       <version>${gearpump.version}</version>
-      <scope>provided</scope>
       <exclusions>
         <exclusion>
           <groupId>org.apache.gearpump</groupId>
@@ -186,10 +185,6 @@
       <artifactId>jackson-annotations</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.google.http-client</groupId>
-      <artifactId>google-http-client</artifactId>
-    </dependency>
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
@@ -225,6 +220,7 @@
       <artifactId>auto-service</artifactId>
       <version>1.0-rc2</version>
     </dependency>
+
   </dependencies>
 
   <build>
@@ -287,6 +283,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
       </plugin>
+
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
index 5b6ee96..e02cbbc 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
@@ -18,17 +18,17 @@
 
 package org.apache.beam.runners.gearpump;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.Map;
+
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
 import org.apache.gearpump.cluster.client.ClientContext;
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
 
-import java.util.Map;
-
 /**
  * Options that configure the Gearpump pipeline.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 6184bc3..2011a4b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -17,14 +17,16 @@
  */
 package org.apache.beam.runners.gearpump;
 
+import java.io.IOException;
+
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.transforms.Aggregator;
+
 import org.joda.time.Duration;
 
-import java.io.IOException;
 
 /**
  * Result of executing a {@link Pipeline} with Gearpump.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
index 4182ee4..ad7bb3e 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
@@ -17,6 +17,13 @@
  */
 package org.apache.beam.runners.gearpump;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -30,25 +37,18 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AssignWindows;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
-
 import org.apache.gearpump.cluster.ClusterConfig;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.cluster.client.ClientContext;
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * A {@link PipelineRunner} that executes the operations in the
  * pipeline by first translating them to Gearpump Stream DSL
@@ -79,16 +79,16 @@ public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResul
       PTransform<InputT, OutputT> transform, InputT input) {
     if (Window.Bound.class.equals(transform.getClass())) {
       return (OutputT) super.apply(
-          new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
+              new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
     } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
-        && ((PCollectionList<?>) input).size() == 0) {
-      return (OutputT) Pipeline.applyTransform(input, Create.of());
+            && ((PCollectionList<?>) input).size() == 0) {
+      return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
     } else if (Create.Values.class.equals(transform.getClass())) {
       return (OutputT) PCollection
-          .<OutputT>createPrimitiveOutputInternal(
-              input.getPipeline(),
-              WindowingStrategy.globalDefault(),
-              PCollection.IsBounded.BOUNDED);
+              .<OutputT>createPrimitiveOutputInternal(
+                      input.getPipeline(),
+                      WindowingStrategy.globalDefault(),
+                      PCollection.IsBounded.BOUNDED);
     } else {
       return super.apply(transform, input);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
index 2b9e89e..ca173d1 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
@@ -18,14 +18,14 @@
 
 package org.apache.beam.runners.gearpump;
 
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
 
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
 /**
  * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for
the
  * {@link GearpumpPipelineRunner}.
@@ -44,8 +44,7 @@ public class GearpumpPipelineRunnerRegistrar {
 
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners()
{
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
-          TestGearpumpRunner.class);
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(TestGearpumpRunner.class);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 59f0df7..5045ae4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.gearpump;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator;
 import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator;
 import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
@@ -41,9 +43,6 @@ import org.apache.gearpump.util.Graph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects
  * into Gearpump {@link Graph}.
@@ -109,7 +108,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor
{
 
   @Override
   public void visitValue(PValue value, TransformTreeNode producer) {
-    LOG.debug("visiting value {}", value);
+    LOG.info("visiting value {}", value);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
index 5f35c6b..ba50de7 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
@@ -23,11 +23,9 @@ import org.apache.beam.runners.gearpump.GearpumpPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -45,15 +43,9 @@ import org.slf4j.LoggerFactory;
 public class StreamingWordCount {
 
   static class ExtractWordsFn extends OldDoFn<String, String> {
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
 
     @Override
     public void processElement(ProcessContext c) {
-      if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
-      }
-
       // Split the line into words.
       String[] words = c.element().split("[^a-zA-Z']+");
 
@@ -81,11 +73,12 @@ public class StreamingWordCount {
 
 
   public static void main(String[] args) {
-    GearpumpPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
-        .as(GearpumpPipelineOptions.class);
-    options.setApplicationName("StreamingWordCount");
+    GearpumpPipelineOptions options = PipelineOptionsFactory
+            .fromArgs(args).as(GearpumpPipelineOptions.class);
     options.setRunner(GearpumpPipelineRunner.class);
+    options.setApplicationName("StreamingWordCount");
     options.setParallelism(1);
+
     Pipeline p = Pipeline.create(options);
 
     PCollection<KV<String, Long>> wordCounts =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
index caf066c..b014432 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
@@ -18,13 +18,6 @@
 
 package org.apache.beam.runners.gearpump.examples;
 
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -33,6 +26,14 @@ import java.util.NoSuchElementException;
 
 import javax.annotation.Nullable;
 
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+
 /**
  * unbounded source that reads from text.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index f36b908..43e3336 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -18,23 +18,25 @@
 
 package org.apache.beam.runners.gearpump.translators;
 
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
-import com.google.common.collect.Iterables;
-
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
+
 
 /**
  * {@link GroupByKey} is translated to Gearpump groupBy function.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index d5ed0d2..2b49684 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -18,6 +18,14 @@
 
 package org.apache.beam.runners.gearpump.translators;
 
+import com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
@@ -25,8 +33,6 @@ import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -35,17 +41,11 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
-import com.google.common.collect.Lists;
-
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 /**
  * {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
  * with {@link DoFn} wrapped in {@link DoFnMultiFunction}. The outputs are

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
index 1ed6d5d..c8587d3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
@@ -18,11 +18,10 @@
 
 package org.apache.beam.runners.gearpump.translators;
 
+import java.io.Serializable;
 
 import org.apache.beam.sdk.transforms.PTransform;
 
-import java.io.Serializable;
-
 /**
  * translates {@link PTransform} to Gearpump functions.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index b9b2c7a..d3bc75d 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -20,6 +20,9 @@ package org.apache.beam.runners.gearpump.translators;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -33,9 +36,6 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
 import org.apache.gearpump.streaming.source.DataSource;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * Maintains context data for {@link TransformTranslator}s.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index b1ebd2a..8d16356 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -18,26 +18,26 @@
 
 package org.apache.beam.runners.gearpump.translators.functions;
 
+import com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
-import com.google.api.client.util.Lists;
-
 import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
 
-import java.util.Iterator;
-import java.util.List;
-
 /**
  * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
index f25d113..f889101 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
@@ -18,12 +18,12 @@
 
 package org.apache.beam.runners.gearpump.translators.io;
 
+import java.io.IOException;
+
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
 
-import java.io.IOException;
-
 /**
  * wrapper over BoundedSource for Gearpump DataSource API.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 892ccc3..8f2beb2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -18,23 +18,23 @@
 
 package org.apache.beam.runners.gearpump.translators.io;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.gearpump.Message;
 import org.apache.gearpump.streaming.source.DataSource;
 import org.apache.gearpump.streaming.task.TaskContext;
 
 import org.joda.time.Instant;
 
-import java.io.IOException;
-
 /**
  * common methods for {@link BoundedSourceWrapper} and {@link UnboundedSourceWrapper}.
  */
@@ -61,6 +61,7 @@ public abstract class GearpumpSource<T> implements DataSource {
       PipelineOptions options = new ObjectMapper()
           .readValue(serializedOptions, PipelineOptions.class);
       this.reader = createReader(options);
+      this.available = reader.start();
     } catch (Exception e) {
       throw new RuntimeException(e);
     } finally {
@@ -97,4 +98,5 @@ public abstract class GearpumpSource<T> implements DataSource {
       throw new RuntimeException(e);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
index b39f29f..dfdecb2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
@@ -18,12 +18,12 @@
 
 package org.apache.beam.runners.gearpump.translators.io;
 
+import java.io.IOException;
+
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 
-import java.io.IOException;
-
 /**
  * wrapper over UnboundedSource for Gearpump DataSource API.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index 24055f7..9359e35 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -18,12 +18,6 @@
 
 package org.apache.beam.runners.gearpump.translators.io;
 
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -36,6 +30,12 @@ import java.util.NoSuchElementException;
 
 import javax.annotation.Nullable;
 
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
 /**
  * unbounded source that reads from a Java {@link Iterable}.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
index be0d025..e205575 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
@@ -18,6 +18,20 @@
 
 package org.apache.beam.runners.gearpump.translators.utils;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -31,11 +45,8 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
 import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.SimpleDoFnRunner;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -46,19 +57,8 @@ import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
 import org.joda.time.Instant;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
 /**
  * a serializable {@link SimpleDoFnRunner}.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
index 600ebfb..d1a9198 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
@@ -18,14 +18,14 @@
 
 package org.apache.beam.runners.gearpump.translators.utils;
 
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
 import java.io.Serializable;
 
 import javax.annotation.Nullable;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
 /**
  * no-op side input reader.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
index ce0935a..45f146b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
@@ -18,6 +18,9 @@
 
 package org.apache.beam.runners.gearpump.translators.utils;
 
+import java.io.IOException;
+import java.io.Serializable;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ExecutionContext;
@@ -26,9 +29,6 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
 
-import java.io.IOException;
-import java.io.Serializable;
-
 /**
  * serializable {@link ExecutionContext.StepContext} that basically does nothing.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 53f46f6..e95304d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -134,7 +134,7 @@ public class Pipeline {
    */
   public static Pipeline create(PipelineOptions options) {
     Pipeline pipeline = new Pipeline(PipelineRunner.fromOptions(options), options);
-    LOG.debug("Creating {}", pipeline);
+    LOG.info("Creating {}", pipeline);
     return pipeline;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index ede1507..1ec4103 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -57,6 +57,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult>
{
         .fromFactoryMethod("fromOptions")
         .withArg(PipelineOptions.class, options)
         .build();
+    System.out.println("runner: " + result.getClass().getName());
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 4803d77..642971f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import java.io.IOException;
+
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -31,6 +32,8 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utility class containing adapters for running a {@link DoFn} as an {@link OldDoFn}.
@@ -72,6 +75,8 @@ public class DoFnAdapters {
   private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT,
OutputT> {
     private final DoFn<InputT, OutputT> fn;
     private transient DoFnInvoker<InputT, OutputT> invoker;
+    private static final Logger LOG =
+            LoggerFactory.getLogger(SimpleDoFnAdapter.class);
 
     SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
       super(fn.aggregators);



Mime
View raw message