Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BC4C3200BC9 for ; Sat, 12 Nov 2016 03:28:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BAFA2160B18; Sat, 12 Nov 2016 02:28:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8F9A2160B16 for ; Sat, 12 Nov 2016 03:28:26 +0100 (CET) Received: (qmail 15952 invoked by uid 500); 12 Nov 2016 02:28:25 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 15840 invoked by uid 99); 12 Nov 2016 02:28:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Nov 2016 02:28:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 55D06C246E for ; Sat, 12 Nov 2016 02:28:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id UR-UdEioGSik for ; Sat, 12 Nov 2016 02:28:23 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 74E365F306 for ; Sat, 12 Nov 2016 02:28:20 +0000 (UTC) Received: (qmail 14628 invoked by uid 99); 12 Nov 2016 02:28:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Nov 2016 02:28:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 69B4FF1715; Sat, 12 Nov 2016 02:28:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Sat, 12 Nov 2016 02:28:25 -0000 Message-Id: In-Reply-To: <243dc18a98a4482dbe87c9edfad4e7e8@git.apache.org> References: <243dc18a98a4482dbe87c9edfad4e7e8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/39] incubator-beam git commit: BEAM-261 Checkpointing for pushed back inputs. archived-at: Sat, 12 Nov 2016 02:28:27 -0000 BEAM-261 Checkpointing for pushed back inputs. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fd7f46c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fd7f46c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fd7f46c1 Branch: refs/heads/master Commit: fd7f46c19b9c95a63b522793bb6fb8a849167cbc Parents: 047cff4 Author: Thomas Weise Authored: Thu Oct 13 00:56:37 2016 -0700 Committer: Thomas Weise Committed: Mon Oct 17 09:22:16 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/apex/ApexRunner.java | 3 +- .../translators/ParDoBoundMultiTranslator.java | 10 ++- .../apex/translators/ParDoBoundTranslator.java | 11 ++- .../functions/ApexParDoOperator.java | 32 +++++--- .../utils/ValueAndCoderKryoSerializable.java | 81 ++++++++++++++++++++ .../translators/ParDoBoundTranslatorTest.java | 42 ++++++---- 6 files changed, 149 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index e2ebc29..ad49f08 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -72,7 +72,7 @@ public class ApexRunner extends PipelineRunner { private final ApexPipelineOptions options; /** - * TODO: this isn't thread sa + * TODO: this isn't thread safe and may cause issues when tests run in parallel * Holds any most resent assertion error that was raised while processing elements. * Used in the unit test driver in embedded to propagate the exception. */ @@ -89,7 +89,6 @@ public class ApexRunner extends PipelineRunner { @Override public OutputT apply( PTransform transform, InputT input) { -//System.out.println("transform: " + transform); if (Window.Bound.class.equals(transform.getClass())) { return (OutputT) ((PCollection) input).apply( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java index 6488bf6..9c5f2b5 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java @@ -22,8 +22,11 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; @@ -43,10 +46,15 @@ public class ParDoBoundMultiTranslator implements TransformTran public void translate(ParDo.BoundMulti transform, TranslationContext context) { OldDoFn doFn = transform.getFn(); PCollectionTuple output = context.getOutput(); + PCollection input = context.getInput(); List> sideInputs = transform.getSideInputs(); + Coder inputCoder = input.getCoder(); + WindowedValueCoder wvInputCoder = FullWindowedValueCoder.of(inputCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + ApexParDoOperator operator = new ApexParDoOperator<>(context.getPipelineOptions(), doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), - context.>getInput().getWindowingStrategy(), sideInputs); + context.>getInput().getWindowingStrategy(), sideInputs, wvInputCoder); Map, PCollection> outputs = output.getAll(); Map, OutputPort> ports = Maps.newHashMapWithExpectedSize(outputs.size()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java index fa3df7c..8a7dd4b 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java @@ -21,8 +21,12 @@ package org.apache.beam.runners.apex.translators; import java.util.List; import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -41,10 +45,15 @@ public class ParDoBoundTranslator implements public void translate(ParDo.Bound transform, TranslationContext context) { OldDoFn doFn = transform.getFn(); PCollection output = context.getOutput(); + PCollection input = context.getInput(); List> sideInputs = transform.getSideInputs(); + Coder inputCoder = input.getCoder(); + WindowedValueCoder wvInputCoder = FullWindowedValueCoder.of(inputCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + ApexParDoOperator operator = new ApexParDoOperator<>(context.getPipelineOptions(), doFn, new TupleTag(), TupleTagList.empty().getAll() /*sideOutputTags*/, - output.getWindowingStrategy(), sideInputs); + output.getWindowingStrategy(), sideInputs, wvInputCoder); context.addOperator(operator, operator.output); context.addStream(context.getInput(), operator.input); if (!sideInputs.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java index 995fee1..a951ca7 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java @@ -26,11 +26,14 @@ import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translators.utils.NoOpStepContext; import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions; +import org.apache.beam.runners.apex.translators.utils.ValueAndCoderKryoSerializable; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.repackaged.com.google.common.base.Throwables; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; @@ -84,8 +87,7 @@ public class ApexParDoOperator extends BaseOperator implements // TODO: not Kryo serializable, integrate codec //@Bind(JavaSerializer.class) private transient StateInternals sideInputStateInternals = InMemoryStateInternals.forKey(null); - // TODO: not Kryo serializable, integrate codec - private List> pushedBack = new ArrayList<>(); + private final ValueAndCoderKryoSerializable>> pushedBack; private LongMin pushedBackWatermark = new LongMin(); private long currentInputWatermark = Long.MIN_VALUE; private long currentOutputWatermark = currentInputWatermark; @@ -100,7 +102,8 @@ private transient StateInternals sideInputStateInternals = InMemoryStateIn TupleTag mainOutputTag, List> sideOutputTags, WindowingStrategy windowingStrategy, - List> sideInputs + List> sideInputs, + Coder> inputCoder ) { this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions); @@ -110,19 +113,28 @@ private transient StateInternals sideInputStateInternals = InMemoryStateIn this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; - if (sideOutputTags != null && sideOutputTags.size() > sideOutputPorts.length) { + if (sideOutputTags.size() > sideOutputPorts.length) { String msg = String.format("Too many side outputs (currently only supporting %s).", sideOutputPorts.length); throw new UnsupportedOperationException(msg); } + + Coder>> coder = ListCoder.of(inputCoder); + this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList>(), coder); + } @SuppressWarnings("unused") // for Kryo private ApexParDoOperator() { - this(null, null, null, null, null, null); + this.pipelineOptions = null; + this.doFn = null; + this.mainOutputTag = null; + this.sideOutputTags = null; + this.windowingStrategy = null; + this.sideInputs = null; + this.pushedBack = null; } - public final transient DefaultInputPort>> input = new DefaultInputPort>>() { @Override @@ -137,7 +149,7 @@ private transient StateInternals sideInputStateInternals = InMemoryStateIn Iterable> justPushedBack = processElementInReadyWindows(t.getValue()); for (WindowedValue pushedBackValue : justPushedBack) { pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis()); - pushedBack.add(pushedBackValue); + pushedBack.get().add(pushedBackValue); } } } @@ -162,16 +174,16 @@ private transient StateInternals sideInputStateInternals = InMemoryStateIn sideInputHandler.addSideInputValue(sideInput, t.getValue()); List> newPushedBack = new ArrayList<>(); - for (WindowedValue elem : pushedBack) { + for (WindowedValue elem : pushedBack.get()) { Iterable> justPushedBack = processElementInReadyWindows(elem); Iterables.addAll(newPushedBack, justPushedBack); } - pushedBack.clear(); + pushedBack.get().clear(); pushedBackWatermark.clear(); for (WindowedValue pushedBackValue : newPushedBack) { pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis()); - pushedBack.add(pushedBackValue); + pushedBack.get().add(pushedBackValue); } // potentially emit watermark http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java new file mode 100644 index 0000000..2de737d --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translators.utils; + +import java.io.IOException; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Throwables; + + +/** + * A {@link KryoSerializable} holder that uses the specified {@link Coder}. + * @param + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable +{ + private static JavaSerializer JAVA_SERIALIZER = new JavaSerializer(); + private T value; + private Coder coder; + + public ValueAndCoderKryoSerializable(T value, Coder coder) { + this.value = value; + this.coder = coder; + } + + @SuppressWarnings("unused") // for Kryo + private ValueAndCoderKryoSerializable() { + } + + public T get() { + return value; + } + + @Override + public void write(Kryo kryo, Output output) + { + try { + kryo.writeClass(output, coder.getClass()); + kryo.writeObject(output, coder, JAVA_SERIALIZER); + coder.encode(value, output, Context.OUTER); + } catch (IOException e) { + Throwables.propagate(e); + } + } + + @Override + public void read(Kryo kryo, Input input) + { + try { + @SuppressWarnings("unchecked") + Class> type = kryo.readClass(input).getType(); + coder = kryo.readObject(input, type, JAVA_SERIALIZER); + value = coder.decode(input, Context.OUTER); + } catch (IOException e) { + Throwables.propagate(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java index 301f6f8..b9748ee 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java @@ -18,47 +18,48 @@ package org.apache.beam.runners.apex.translators; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.regex.Pattern; + import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.ApexRunnerResult; import org.apache.beam.runners.apex.TestApexRunner; -import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; - -import com.datatorrent.api.DAG; -import com.datatorrent.lib.util.KryoCloneUtils; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.regex.Pattern; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * integration test for {@link ParDoBoundTranslator}. @@ -181,9 +182,18 @@ public class ParDoBoundTranslatorTest { public void testSerialization() throws Exception { ApexPipelineOptions options = PipelineOptionsFactory.create() .as(ApexPipelineOptions.class); + options.setRunner(TestApexRunner.class); + Pipeline pipeline = Pipeline.create(options); + Coder> coder = WindowedValue.getValueOnlyCoder(VarIntCoder.of()); + + PCollectionView singletonView = pipeline.apply(Create.of(1)) + .apply(Sum.integersGlobally().asSingletonView()); + ApexParDoOperator operator = new ApexParDoOperator<>(options, new Add(0), new TupleTag(), TupleTagList.empty().getAll(), - WindowingStrategy.globalDefault(), Collections.> emptyList()); + WindowingStrategy.globalDefault(), + Collections.>singletonList(singletonView), + coder); operator.setup(null); operator.beginWindow(0); WindowedValue wv = WindowedValue.valueInGlobalWindow(0);