Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 14B05200CE4 for ; Mon, 10 Jul 2017 06:49:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 132C8167B6B; Mon, 10 Jul 2017 04:49:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 04E7A167AD6 for ; Mon, 10 Jul 2017 06:49:36 +0200 (CEST) Received: (qmail 59750 invoked by uid 500); 10 Jul 2017 04:49:36 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 56500 invoked by uid 99); 10 Jul 2017 04:49:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Jul 2017 04:49:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7AD3DE96A8; Mon, 10 Jul 2017 04:49:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Mon, 10 Jul 2017 04:50:12 -0000 Message-Id: In-Reply-To: <475176a9da1b498e999831aa04004255@git.apache.org> References: <475176a9da1b498e999831aa04004255@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [42/43] beam git commit: Fix ParDoTest#testPipelineOptionsParameter archived-at: Mon, 10 Jul 2017 04:49:38 -0000 Fix ParDoTest#testPipelineOptionsParameter Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/725f547f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/725f547f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/725f547f Branch: refs/heads/gearpump-runner Commit: 725f547f5e487dd3e84d5d0f95c0fa3efa853279 Parents: 2206827 Author: manuzhang Authored: Sat Jul 8 00:13:19 2017 +0800 Committer: manuzhang Committed: Sat Jul 8 00:13:19 2017 +0800 ---------------------------------------------------------------------- .../gearpump/translators/io/GearpumpSource.java | 12 ++---------- .../translators/utils/DoFnRunnerFactory.java | 5 +++-- .../translators/utils/TranslatorUtils.java | 19 +++++++++++++++++++ 3 files changed, 24 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java index daa8c81..2f53139 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -18,9 +18,6 @@ package org.apache.beam.runners.gearpump.translators.io; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - import java.io.IOException; import java.time.Instant; @@ -48,11 +45,7 @@ public abstract class GearpumpSource implements DataSource { private boolean available = false; GearpumpSource(PipelineOptions options) { - try { - this.serializedOptions = new ObjectMapper().writeValueAsBytes(options); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + this.serializedOptions = TranslatorUtils.serializePipelineOptions(options); } protected abstract Source.Reader createReader(PipelineOptions options) throws IOException; @@ -60,8 +53,7 @@ public abstract class GearpumpSource implements DataSource { @Override public void open(TaskContext context, Instant startTime) { try { - PipelineOptions options = new ObjectMapper() - .readValue(serializedOptions, PipelineOptions.class); + PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions); this.reader = createReader(options); this.available = reader.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java index 35cf2b5..375b696 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java @@ -43,7 +43,7 @@ public class DoFnRunnerFactory implements Serializable { private static final long serialVersionUID = -4109539010014189725L; private final DoFn fn; - private final transient PipelineOptions options; + private final byte[] serializedOptions; private final Collection> sideInputs; private final DoFnRunners.OutputManager outputManager; private final TupleTag mainOutputTag; @@ -61,7 +61,7 @@ public class DoFnRunnerFactory implements Serializable { StepContext stepContext, WindowingStrategy windowingStrategy) { this.fn = doFn; - this.options = pipelineOptions; + this.serializedOptions = TranslatorUtils.serializePipelineOptions(pipelineOptions); this.sideInputs = sideInputs; this.outputManager = outputManager; this.mainOutputTag = mainOutputTag; @@ -72,6 +72,7 @@ public class DoFnRunnerFactory implements Serializable { public PushbackSideInputDoFnRunner createRunner( ReadyCheckingSideInputReader sideInputReader) { + PipelineOptions options = TranslatorUtils.deserializePipelineOptions(serializedOptions); DoFnRunner underlying = DoFnRunners.simpleRunner( options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext, windowingStrategy); http://git-wip-us.apache.org/repos/asf/beam/blob/725f547f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java index b1cd61c..c14298f 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java @@ -18,8 +18,11 @@ package org.apache.beam.runners.gearpump.translators.utils; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import java.io.IOException; import java.time.Instant; import java.util.Collection; import java.util.HashMap; @@ -27,6 +30,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.gearpump.translators.TranslationContext; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -141,6 +145,21 @@ public class TranslatorUtils { } } + public static byte[] serializePipelineOptions(PipelineOptions options) { + try { + return new ObjectMapper().writeValueAsBytes(options); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public static PipelineOptions deserializePipelineOptions(byte[] serializedOptions) { + try { + return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + } /** * This is copied from org.apache.beam.sdk.transforms.join.RawUnionValue.