Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4474C200B95 for ; Tue, 13 Sep 2016 02:40:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 42EDF160AC8; Tue, 13 Sep 2016 00:40:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B8F7E160AD7 for ; Tue, 13 Sep 2016 02:40:44 +0200 (CEST) Received: (qmail 2481 invoked by uid 500); 13 Sep 2016 00:40:43 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 2471 invoked by uid 99); 13 Sep 2016 00:40:43 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 00:40:43 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 5C6A81806D1 for ; Tue, 13 Sep 2016 00:40:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id YRipzab4yQqo for ; Tue, 13 Sep 2016 00:40:37 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 201E360E0F for ; Tue, 13 Sep 2016 00:40:33 +0000 (UTC) Received: (qmail 98657 invoked by uid 99); 13 Sep 2016 00:40:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 00:40:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 252FDDFD9F; Tue, 13 Sep 2016 00:40:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Tue, 13 Sep 2016 00:41:15 -0000 Message-Id: In-Reply-To: <5d2051da80084a09aedbe0b48aa93047@git.apache.org> References: <5d2051da80084a09aedbe0b48aa93047@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [44/50] [abbrv] incubator-beam git commit: fix import order archived-at: Tue, 13 Sep 2016 00:40:46 -0000 fix import order Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59ae94c5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59ae94c5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59ae94c5 Branch: refs/heads/gearpump-runner Commit: 59ae94c59931732d5cf78c5431147d580f9ff747 Parents: 6cd48c4 Author: manuzhang Authored: Mon Sep 12 11:45:15 2016 +0800 Committer: Dan Halperin Committed: Mon Sep 12 17:40:14 2016 -0700 ---------------------------------------------------------------------- runners/gearpump/pom.xml | 7 ++--- .../gearpump/GearpumpPipelineOptions.java | 8 +++--- .../gearpump/GearpumpPipelineResult.java | 4 ++- .../gearpump/GearpumpPipelineRunner.java | 28 ++++++++++---------- .../GearpumpPipelineRunnerRegistrar.java | 9 +++---- .../gearpump/GearpumpPipelineTranslator.java | 7 +++-- .../gearpump/examples/StreamingWordCount.java | 15 +++-------- .../gearpump/examples/UnboundedTextSource.java | 15 ++++++----- .../translators/GroupByKeyTranslator.java | 14 +++++----- .../translators/ParDoBoundMultiTranslator.java | 16 +++++------ .../translators/TransformTranslator.java | 3 +-- .../translators/TranslationContext.java | 6 ++--- .../translators/functions/DoFnFunction.java | 14 +++++----- .../translators/io/BoundedSourceWrapper.java | 4 +-- .../gearpump/translators/io/GearpumpSource.java | 12 +++++---- .../translators/io/UnboundedSourceWrapper.java | 4 +-- .../gearpump/translators/io/ValuesSource.java | 12 ++++----- .../translators/utils/GearpumpDoFnRunner.java | 28 ++++++++++---------- .../translators/utils/NoOpSideInputReader.java | 8 +++--- .../translators/utils/NoOpStepContext.java | 6 ++--- .../main/java/org/apache/beam/sdk/Pipeline.java | 2 +- .../apache/beam/sdk/runners/PipelineRunner.java | 1 + .../beam/sdk/transforms/DoFnAdapters.java | 5 ++++ 23 files changed, 114 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index cc99a7a..296de6b 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -122,7 +122,6 @@ org.apache.gearpump gearpump-daemon_2.11 ${gearpump.version} - provided org.apache.gearpump @@ -186,10 +185,6 @@ jackson-annotations - com.google.http-client - google-http-client - - com.google.guava guava @@ -225,6 +220,7 @@ auto-service 1.0-rc2 + @@ -287,6 +283,7 @@ org.apache.maven.plugins maven-checkstyle-plugin + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java index 5b6ee96..e02cbbc 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java @@ -18,17 +18,17 @@ package org.apache.beam.runners.gearpump; +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.Map; + import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; -import com.fasterxml.jackson.annotation.JsonIgnore; - import org.apache.gearpump.cluster.client.ClientContext; import org.apache.gearpump.cluster.embedded.EmbeddedCluster; -import java.util.Map; - /** * Options that configure the Gearpump pipeline. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java index 6184bc3..2011a4b 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java @@ -17,14 +17,16 @@ */ package org.apache.beam.runners.gearpump; +import java.io.IOException; + import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.transforms.Aggregator; + import org.joda.time.Duration; -import java.io.IOException; /** * Result of executing a {@link Pipeline} with Gearpump. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java index 4182ee4..ad7bb3e 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java @@ -17,6 +17,13 @@ */ package org.apache.beam.runners.gearpump; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.beam.runners.core.AssignWindows; import org.apache.beam.runners.gearpump.translators.TranslationContext; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; @@ -30,25 +37,18 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.AssignWindows; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; - import org.apache.gearpump.cluster.ClusterConfig; import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.cluster.client.ClientContext; import org.apache.gearpump.cluster.embedded.EmbeddedCluster; import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; -import java.util.HashMap; -import java.util.Map; - /** * A {@link PipelineRunner} that executes the operations in the * pipeline by first translating them to Gearpump Stream DSL @@ -79,16 +79,16 @@ public class GearpumpPipelineRunner extends PipelineRunner transform, InputT input) { if (Window.Bound.class.equals(transform.getClass())) { return (OutputT) super.apply( - new AssignWindowsAndSetStrategy((Window.Bound) transform), input); + new AssignWindowsAndSetStrategy((Window.Bound) transform), input); } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) - && ((PCollectionList) input).size() == 0) { - return (OutputT) Pipeline.applyTransform(input, Create.of()); + && ((PCollectionList) input).size() == 0) { + return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of()); } else if (Create.Values.class.equals(transform.getClass())) { return (OutputT) PCollection - .createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - PCollection.IsBounded.BOUNDED); + .createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.BOUNDED); } else { return super.apply(transform, input); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java index 2b9e89e..ca173d1 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java @@ -18,14 +18,14 @@ package org.apache.beam.runners.gearpump; +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; + import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsRegistrar; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; - /** * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the * {@link GearpumpPipelineRunner}. @@ -44,8 +44,7 @@ public class GearpumpPipelineRunnerRegistrar { @Override public Iterable>> getPipelineRunners() { - return ImmutableList.>>of( - TestGearpumpRunner.class); + return ImmutableList.>>of(TestGearpumpRunner.class); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java index 59f0df7..5045ae4 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.gearpump; +import java.util.HashMap; +import java.util.Map; import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator; import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator; import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator; @@ -41,9 +43,6 @@ import org.apache.gearpump.util.Graph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; - /** * {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects * into Gearpump {@link Graph}. @@ -109,7 +108,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { @Override public void visitValue(PValue value, TransformTreeNode producer) { - LOG.debug("visiting value {}", value); + LOG.info("visiting value {}", value); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java index 5f35c6b..ba50de7 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java @@ -23,11 +23,9 @@ import org.apache.beam.runners.gearpump.GearpumpPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; @@ -45,15 +43,9 @@ import org.slf4j.LoggerFactory; public class StreamingWordCount { static class ExtractWordsFn extends OldDoFn { - private final Aggregator emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); @Override public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - // Split the line into words. String[] words = c.element().split("[^a-zA-Z']+"); @@ -81,11 +73,12 @@ public class StreamingWordCount { public static void main(String[] args) { - GearpumpPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(GearpumpPipelineOptions.class); - options.setApplicationName("StreamingWordCount"); + GearpumpPipelineOptions options = PipelineOptionsFactory + .fromArgs(args).as(GearpumpPipelineOptions.class); options.setRunner(GearpumpPipelineRunner.class); + options.setApplicationName("StreamingWordCount"); options.setParallelism(1); + Pipeline p = Pipeline.create(options); PCollection> wordCounts = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java index caf066c..b014432 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java @@ -18,13 +18,6 @@ package org.apache.beam.runners.gearpump.examples; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; - -import org.joda.time.Instant; - import java.io.IOException; import java.io.Serializable; import java.util.Collections; @@ -33,6 +26,14 @@ import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; + +import org.joda.time.Instant; + + /** * unbounded source that reads from text. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index f36b908..43e3336 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -18,23 +18,25 @@ package org.apache.beam.runners.gearpump.translators; +import com.google.common.collect.Iterables; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; -import com.google.common.collect.Iterables; - import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction; import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction; import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; + /** * {@link GroupByKey} is translated to Gearpump groupBy function. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java index d5ed0d2..2b49684 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java @@ -18,6 +18,14 @@ package org.apache.beam.runners.gearpump.translators; +import com.google.common.collect.Lists; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; @@ -25,8 +33,6 @@ import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.DoFnRunner; -import org.apache.beam.sdk.util.DoFnRunners; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -35,17 +41,11 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import com.google.common.collect.Lists; - import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction; import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - /** * {@link ParDo.BoundMulti} is translated to Gearpump flatMap function * with {@link DoFn} wrapped in {@link DoFnMultiFunction}. The outputs are http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java index 1ed6d5d..c8587d3 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java @@ -18,11 +18,10 @@ package org.apache.beam.runners.gearpump.translators; +import java.io.Serializable; import org.apache.beam.sdk.transforms.PTransform; -import java.io.Serializable; - /** * translates {@link PTransform} to Gearpump functions. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java index b9b2c7a..d3bc75d 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java @@ -20,6 +20,9 @@ package org.apache.beam.runners.gearpump.translators; import static com.google.common.base.Preconditions.checkArgument; +import java.util.HashMap; +import java.util.Map; + import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -33,9 +36,6 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; import org.apache.gearpump.streaming.source.DataSource; -import java.util.HashMap; -import java.util.Map; - /** * Maintains context data for {@link TransformTranslator}s. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java index b1ebd2a..8d16356 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java @@ -18,26 +18,26 @@ package org.apache.beam.runners.gearpump.translators.functions; +import com.google.common.collect.Lists; + +import java.util.Iterator; +import java.util.List; + +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.util.DoFnRunner; -import org.apache.beam.sdk.util.DoFnRunners; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import com.google.api.client.util.Lists; - import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; -import java.util.Iterator; -import java.util.List; - /** * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java index f25d113..f889101 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java @@ -18,12 +18,12 @@ package org.apache.beam.runners.gearpump.translators.io; +import java.io.IOException; + import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; -import java.io.IOException; - /** * wrapper over BoundedSource for Gearpump DataSource API. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java index 892ccc3..8f2beb2 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -18,23 +18,23 @@ package org.apache.beam.runners.gearpump.translators.io; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.gearpump.Message; import org.apache.gearpump.streaming.source.DataSource; import org.apache.gearpump.streaming.task.TaskContext; import org.joda.time.Instant; -import java.io.IOException; - /** * common methods for {@link BoundedSourceWrapper} and {@link UnboundedSourceWrapper}. */ @@ -61,6 +61,7 @@ public abstract class GearpumpSource implements DataSource { PipelineOptions options = new ObjectMapper() .readValue(serializedOptions, PipelineOptions.class); this.reader = createReader(options); + this.available = reader.start(); } catch (Exception e) { throw new RuntimeException(e); } finally { @@ -97,4 +98,5 @@ public abstract class GearpumpSource implements DataSource { throw new RuntimeException(e); } } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java index b39f29f..dfdecb2 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java @@ -18,12 +18,12 @@ package org.apache.beam.runners.gearpump.translators.io; +import java.io.IOException; + import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; -import java.io.IOException; - /** * wrapper over UnboundedSource for Gearpump DataSource API. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java index 24055f7..9359e35 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java @@ -18,12 +18,6 @@ package org.apache.beam.runners.gearpump.translators.io; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; - -import org.joda.time.Instant; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -36,6 +30,12 @@ import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; + +import org.joda.time.Instant; + /** * unbounded source that reads from a Java {@link Iterable}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java index be0d025..e205575 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java @@ -18,6 +18,20 @@ package org.apache.beam.runners.gearpump.translators.utils; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.SimpleDoFnRunner; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -31,11 +45,8 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.DoFnRunner; -import org.apache.beam.sdk.util.DoFnRunners; import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.SimpleDoFnRunner; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; @@ -46,19 +57,8 @@ import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; - import org.joda.time.Instant; -import java.io.IOException; -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - /** * a serializable {@link SimpleDoFnRunner}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java index 600ebfb..d1a9198 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java @@ -18,14 +18,14 @@ package org.apache.beam.runners.gearpump.translators.utils; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.values.PCollectionView; - import java.io.Serializable; import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.values.PCollectionView; + /** * no-op side input reader. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java index ce0935a..45f146b 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java @@ -18,6 +18,9 @@ package org.apache.beam.runners.gearpump.translators.utils; +import java.io.IOException; +import java.io.Serializable; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ExecutionContext; @@ -26,9 +29,6 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; -import java.io.IOException; -import java.io.Serializable; - /** * serializable {@link ExecutionContext.StepContext} that basically does nothing. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 53f46f6..e95304d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -134,7 +134,7 @@ public class Pipeline { */ public static Pipeline create(PipelineOptions options) { Pipeline pipeline = new Pipeline(PipelineRunner.fromOptions(options), options); - LOG.debug("Creating {}", pipeline); + LOG.info("Creating {}", pipeline); return pipeline; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java index ede1507..1ec4103 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java @@ -57,6 +57,7 @@ public abstract class PipelineRunner { .fromFactoryMethod("fromOptions") .withArg(PipelineOptions.class, options) .build(); + System.out.println("runner: " + result.getClass().getName()); return result; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59ae94c5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index 4803d77..642971f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import java.io.IOException; + import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -31,6 +32,8 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utility class containing adapters for running a {@link DoFn} as an {@link OldDoFn}. @@ -72,6 +75,8 @@ public class DoFnAdapters { private static class SimpleDoFnAdapter extends OldDoFn { private final DoFn fn; private transient DoFnInvoker invoker; + private static final Logger LOG = + LoggerFactory.getLogger(SimpleDoFnAdapter.class); SimpleDoFnAdapter(DoFn fn) { super(fn.aggregators);