Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4DE2F200C92 for ; Mon, 12 Jun 2017 18:55:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4CBF2160BEE; Mon, 12 Jun 2017 16:55:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F1E13160BF1 for ; Mon, 12 Jun 2017 18:55:21 +0200 (CEST) Received: (qmail 12512 invoked by uid 500); 12 Jun 2017 16:55:20 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 9263 invoked by uid 99); 12 Jun 2017 16:55:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jun 2017 16:55:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DBBE0F4A64; Mon, 12 Jun 2017 16:55:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Mon, 12 Jun 2017 16:56:04 -0000 Message-Id: <10f0e185e77341279ed0efa6c9c71c43@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [49/50] [abbrv] beam git commit: Fix side input handling in DoFnFunction archived-at: Mon, 12 Jun 2017 16:55:23 -0000 Fix side input handling in DoFnFunction Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7653e7ed Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7653e7ed Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7653e7ed Branch: refs/heads/gearpump-runner Commit: 7653e7ed6de3d9db822dcd390d2bf70819954fa5 Parents: 98854d4 Author: manuzhang Authored: Wed Jun 7 14:08:04 2017 +0800 Committer: manuzhang Committed: Mon Jun 12 11:45:37 2017 +0800 ---------------------------------------------------------------------- .../translators/TranslationContext.java | 2 ++ .../translators/functions/DoFnFunction.java | 23 ++++---------------- .../gearpump/translators/io/GearpumpSource.java | 1 - .../translators/utils/TranslatorUtils.java | 5 ++--- 4 files changed, 8 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java index 4090354..64a1e0d 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java @@ -68,6 +68,8 @@ public class TranslationContext { public void setOutputStream(PValue output, JavaStream outputStream) { if (!streams.containsKey(output)) { streams.put(output, outputStream); + } else { + throw new RuntimeException("set stream for duplicated output " + output); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java index f521d7b..6e4fbeb 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java @@ -24,7 +24,6 @@ import com.google.common.collect.Lists; import java.io.Serializable; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -66,7 +65,6 @@ public class DoFnFunction extends private transient PushbackSideInputDoFnRunner doFnRunner; private transient SideInputHandler sideInputReader; private transient List> pushedBackValues; - private transient Map, List>>> sideInputValues; private final Collection> sideInputs; private final Map> tagsToSideInputs; private final TupleTag mainOutput; @@ -109,7 +107,6 @@ public class DoFnFunction extends doFnRunner = doFnRunnerFactory.createRunner(sideInputReader); pushedBackValues = new LinkedList<>(); - sideInputValues = new HashMap<>(); outputManager.setup(mainOutput, sideOutputs); } @@ -132,25 +129,14 @@ public class DoFnFunction extends } else { // side input PCollectionView sideInput = tagsToSideInputs.get(unionValue.getUnionTag()); - WindowedValue sideInputValue = - (WindowedValue) unionValue.getValue(); - Object value = sideInputValue.getValue(); - if (!(value instanceof Iterable)) { - sideInputValue = sideInputValue.withValue(Lists.newArrayList(value)); - } - if (!sideInputValues.containsKey(sideInput)) { - sideInputValues.put(sideInput, new LinkedList>>()); - } - sideInputValues.get(sideInput).add((WindowedValue>) sideInputValue); + WindowedValue> sideInputValue = + (WindowedValue>) unionValue.getValue(); + sideInputReader.addSideInputValue(sideInput, sideInputValue); } } + for (PCollectionView sideInput: sideInputs) { - if (sideInputValues.containsKey(sideInput)) { - for (WindowedValue> value: sideInputValues.get(sideInput)) { - sideInputReader.addSideInputValue(sideInput, value); - } - } for (WindowedValue value : pushedBackValues) { for (BoundedWindow win: value.getWindows()) { BoundedWindow sideInputWindow = @@ -171,7 +157,6 @@ public class DoFnFunction extends } pushedBackValues.clear(); Iterables.addAll(pushedBackValues, nextPushedBackValues); - sideInputValues.clear(); doFnRunner.finishBundle(); http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java index 5e79151..60f319d 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -28,7 +28,6 @@ import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; -// import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java index 999afae..282f261 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java @@ -70,11 +70,10 @@ public class TranslatorUtils { JavaStream> inputStream, Map> tagsToSideInputs) { JavaStream mainStream = - inputStream.map(new ToRawUnionValue("0"), "map_to_RawUnionValue"); + inputStream.map(new ToRawUnionValue<>("0"), "map_to_RawUnionValue"); for (Map.Entry> tagToSideInput: tagsToSideInputs.entrySet()) { - // actually JavaStream>> - JavaStream> sideInputStream = context.getInputStream( + JavaStream>> sideInputStream = context.getInputStream( tagToSideInput.getValue()); mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>( tagToSideInput.getKey()), "map_to_RawUnionValue"), "merge_to_MainStream");