Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BD30F1929B for ; Sat, 9 Apr 2016 00:04:17 +0000 (UTC) Received: (qmail 54882 invoked by uid 500); 9 Apr 2016 00:04:17 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 54833 invoked by uid 500); 9 Apr 2016 00:04:17 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 54824 invoked by uid 99); 9 Apr 2016 00:04:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 Apr 2016 00:04:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 435BA1A0072 for ; Sat, 9 Apr 2016 00:04:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -2.727 X-Spam-Level: X-Spam-Status: No, score=-2.727 tagged_above=-999 required=6.31 tests=[FUZZY_VPILL=0.494, KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id G4stG4powWal for ; Sat, 9 Apr 2016 00:04:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 1D89D5F39A for ; Sat, 9 Apr 2016 00:04:13 +0000 (UTC) Received: (qmail 54741 invoked by uid 99); 9 Apr 2016 00:04:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 Apr 2016 00:04:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 61560DFC73; Sat, 9 Apr 2016 00:04:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Sat, 09 Apr 2016 00:04:13 -0000 Message-Id: <7f72fadd772d4c72ab6110c0747ff10d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: Move expansion of Window.Bound into DirectPipelineRunner Repository: incubator-beam Updated Branches: refs/heads/master a43f9b820 -> a32a26208 Move expansion of Window.Bound into DirectPipelineRunner In the Beam model, windowing is a primitive concept. The expansion provided by the SDK is not implementable except via access to privileged methods not intended for Beam pipeline authors. This change is a precursor to eliminating these privileged entirely. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42969cb6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42969cb6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42969cb6 Branch: refs/heads/master Commit: 42969cb62222744c41debe575857fb7d093ce527 Parents: 5f24cef Author: Kenneth Knowles Authored: Thu Apr 7 17:34:19 2016 -0700 Committer: Kenneth Knowles Committed: Fri Apr 8 15:03:00 2016 -0700 ---------------------------------------------------------------------- .../sdk/runners/DirectPipelineRunner.java | 47 ++++++++++++++++++++ .../inprocess/WindowEvaluatorFactory.java | 13 +++--- .../sdk/transforms/windowing/Window.java | 21 ++------- .../cloud/dataflow/sdk/util/AssignWindows.java | 46 +++++++++++++++++++ 4 files changed, 104 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java index 35e392b..57e6116 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java @@ -47,7 +47,10 @@ import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.Partition; import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; import com.google.cloud.dataflow.sdk.util.AppliedCombineFn; +import com.google.cloud.dataflow.sdk.util.AssignWindows; import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly; import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; @@ -57,6 +60,7 @@ import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunners; import com.google.cloud.dataflow.sdk.util.SerializableUtils; import com.google.cloud.dataflow.sdk.util.TestCredential; import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.util.common.Counter; import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.values.KV; @@ -255,6 +259,9 @@ public class DirectPipelineRunner } else if (transform instanceof GroupByKey) { return (OutputT) ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); + } else if (transform instanceof Window.Bound) { + return (OutputT) + ((PCollection) input).apply(new AssignWindowsAndSetStrategy((Window.Bound) transform)); } else { return super.apply(transform, input); } @@ -400,6 +407,46 @@ public class DirectPipelineRunner } } + private static class AssignWindowsAndSetStrategy + extends PTransform, PCollection> { + + private final Window.Bound wrapped; + + public AssignWindowsAndSetStrategy(Window.Bound wrapped) { + this.wrapped = wrapped; + } + + @Override + public PCollection apply(PCollection input) { + WindowingStrategy outputStrategy = + wrapped.getOutputStrategyInternal(input.getWindowingStrategy()); + + WindowFn windowFn = + (WindowFn) outputStrategy.getWindowFn(); + + // If the Window.Bound transform only changed parts other than the WindowFn, then + // we skip AssignWindows even though it should be harmless in a perfect world. + // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly + // crash if another GBK is performed without explicitly setting the WindowFn. So we skip + // AssignWindows in this case. + if (wrapped.getWindowFn() == null) { + return input.apply("Identity", ParDo.of(new IdentityFn())) + .setWindowingStrategyInternal(outputStrategy); + } else { + return input + .apply("AssignWindows", new AssignWindows(windowFn)) + .setWindowingStrategyInternal(outputStrategy); + } + } + } + + private static class IdentityFn extends DoFn { + @Override + public void processElement(ProcessContext c) { + c.output(c.element()); + } + } + /** * Apply the override for AvroIO.Write.Bound if the user requested sharding controls * greater than one. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java index 0bdfac9..e553dbb 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java @@ -61,23 +61,24 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory { if (fn == null) { return PassthroughTransformEvaluator.create(transform, outputBundle); } - return new WindowIntoEvaluator<>(fn, evaluationContext, outputBundle); + return new WindowIntoEvaluator<>(transform, fn, outputBundle); } private static class WindowIntoEvaluator implements TransformEvaluator { + private final AppliedPTransform, PCollection, Window.Bound> + transform; private final WindowFn windowFn; - private final InProcessEvaluationContext context; private final UncommittedBundle outputBundle; @SuppressWarnings("unchecked") public WindowIntoEvaluator( + AppliedPTransform, PCollection, Window.Bound> transform, WindowFn windowFn, - InProcessEvaluationContext context, UncommittedBundle outputBundle) { + this.outputBundle = outputBundle; + this.transform = transform; // Safe contravariant cast this.windowFn = (WindowFn) windowFn; - this.context = context; - this.outputBundle = outputBundle; } @Override @@ -98,7 +99,7 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory { @Override public InProcessTransformResult finishBundle() throws Exception { - return StepTransformResult.withoutHold(null).addOutput(outputBundle).build(); + return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java index 1e7282d..20b3ed5 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java @@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -360,6 +359,7 @@ public class Window { */ public static class Bound extends PTransform, PCollection> { + @Nullable private final WindowFn windowFn; @Nullable private final Trigger trigger; @Nullable private final AccumulationMode mode; @@ -587,21 +587,8 @@ public class Window { public PCollection apply(PCollection input) { WindowingStrategy outputStrategy = getOutputStrategyInternal(input.getWindowingStrategy()); - PCollection output; - if (windowFn != null) { - // If the windowFn changed, we create a primitive, and run the AssignWindows operation here. - output = assignWindows(input, windowFn); - } else { - // If the windowFn didn't change, we just run a pass-through transform and then set the - // new windowing strategy. - output = input.apply(Window.identity()); - } - return output.setWindowingStrategyInternal(outputStrategy); - } - - private PCollection assignWindows( - PCollection input, WindowFn windowFn) { - return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn(windowFn))); + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), outputStrategy, input.isBounded()); } @Override @@ -639,7 +626,7 @@ public class Window { * windows to be merged again as part of the next * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}. */ - public static class Remerge extends PTransform, PCollection> { + private static class Remerge extends PTransform, PCollection> { @Override public PCollection apply(PCollection input) { WindowingStrategy outputWindowingStrategy = getOutputWindowing( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java new file mode 100644 index 0000000..57f489d --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.values.PCollection; + +/** + * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a + * {@link PCollection} to windows according to the provided {@link WindowFn}. + * + * @param Type of elements being windowed + * @param Window type + */ +public class AssignWindows + extends PTransform, PCollection> { + + private WindowFn fn; + + public AssignWindows(WindowFn fn) { + this.fn = fn; + } + + @Override + public PCollection apply(PCollection input) { + return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn))); + } +}