Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E4F9E200B95 for ; Tue, 13 Sep 2016 02:40:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E36F5160AC8; Tue, 13 Sep 2016 00:40:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6B2C9160AD7 for ; Tue, 13 Sep 2016 02:40:54 +0200 (CEST) Received: (qmail 3535 invoked by uid 500); 13 Sep 2016 00:40:53 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 3526 invoked by uid 99); 13 Sep 2016 00:40:53 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 00:40:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id F25F61A731F for ; Tue, 13 Sep 2016 00:40:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.152 X-Spam-Level: X-Spam-Status: No, score=-4.152 tagged_above=-999 required=6.31 tests=[FUZZY_VPILL=0.494, KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id rovEX04ToRbQ for ; Tue, 13 Sep 2016 00:40:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id B46315FC3C for ; Tue, 13 Sep 2016 00:40:33 +0000 (UTC) Received: (qmail 98640 invoked by uid 99); 13 Sep 2016 00:40:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 00:40:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0A1D5E0230; Tue, 13 Sep 2016 00:40:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Tue, 13 Sep 2016 00:41:06 -0000 Message-Id: In-Reply-To: <5d2051da80084a09aedbe0b48aa93047@git.apache.org> References: <5d2051da80084a09aedbe0b48aa93047@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [35/50] [abbrv] incubator-beam git commit: Put classes in runners-core package into runners.core namespace archived-at: Tue, 13 Sep 2016 00:40:57 -0000 Put classes in runners-core package into runners.core namespace Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4bf3a3b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4bf3a3b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4bf3a3b3 Branch: refs/heads/gearpump-runner Commit: 4bf3a3b345d94ecea4c77ebdfaed9dd7ef0f39e5 Parents: 60d8cd9 Author: Kenneth Knowles Authored: Thu Aug 25 14:57:26 2016 -0700 Committer: Dan Halperin Committed: Mon Sep 12 17:40:12 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/core/AssignWindows.java | 46 + .../beam/runners/core/AssignWindowsDoFn.java | 79 + .../beam/runners/core/BatchTimerInternals.java | 140 ++ .../apache/beam/runners/core/DoFnRunner.java | 65 + .../beam/runners/core/DoFnRunnerBase.java | 559 +++++++ .../apache/beam/runners/core/DoFnRunners.java | 148 ++ .../runners/core/ElementByteSizeObservable.java | 44 + .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 5 +- .../runners/core/GroupAlsoByWindowsDoFn.java | 66 + .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 100 ++ .../core/GroupByKeyViaGroupByKeyOnly.java | 271 ++++ .../core/LateDataDroppingDoFnRunner.java | 151 ++ .../apache/beam/runners/core/NonEmptyPanes.java | 151 ++ .../beam/runners/core/PaneInfoTracker.java | 158 ++ .../beam/runners/core/PeekingReiterator.java | 100 ++ .../core/PushbackSideInputDoFnRunner.java | 116 ++ .../org/apache/beam/runners/core/ReduceFn.java | 130 ++ .../runners/core/ReduceFnContextFactory.java | 499 ++++++ .../beam/runners/core/ReduceFnRunner.java | 993 ++++++++++++ .../beam/runners/core/SimpleDoFnRunner.java | 58 + .../beam/runners/core/SystemReduceFn.java | 139 ++ .../apache/beam/runners/core/TriggerRunner.java | 247 +++ .../apache/beam/runners/core/WatermarkHold.java | 539 +++++++ .../org/apache/beam/sdk/util/AssignWindows.java | 46 - .../apache/beam/sdk/util/AssignWindowsDoFn.java | 77 - .../beam/sdk/util/BatchTimerInternals.java | 137 -- .../org/apache/beam/sdk/util/DoFnRunner.java | 63 - .../apache/beam/sdk/util/DoFnRunnerBase.java | 551 ------- .../org/apache/beam/sdk/util/DoFnRunners.java | 143 -- .../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 63 - .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 97 -- .../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 268 ---- .../sdk/util/LateDataDroppingDoFnRunner.java | 145 -- .../org/apache/beam/sdk/util/NonEmptyPanes.java | 150 -- .../apache/beam/sdk/util/PaneInfoTracker.java | 156 -- .../sdk/util/PushbackSideInputDoFnRunner.java | 113 -- .../java/org/apache/beam/sdk/util/ReduceFn.java | 128 -- .../beam/sdk/util/ReduceFnContextFactory.java | 493 ------ .../apache/beam/sdk/util/ReduceFnRunner.java | 983 ------------ .../apache/beam/sdk/util/SimpleDoFnRunner.java | 55 - .../apache/beam/sdk/util/SystemReduceFn.java | 138 -- .../org/apache/beam/sdk/util/TriggerRunner.java | 241 --- .../org/apache/beam/sdk/util/WatermarkHold.java | 536 ------- .../util/common/ElementByteSizeObservable.java | 42 - .../beam/sdk/util/common/PeekingReiterator.java | 99 -- .../beam/sdk/util/common/package-info.java | 20 - .../org/apache/beam/sdk/util/package-info.java | 20 - .../runners/core/BatchTimerInternalsTest.java | 118 ++ .../core/GroupAlsoByWindowsProperties.java | 660 ++++++++ ...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 110 ++ .../core/LateDataDroppingDoFnRunnerTest.java | 117 ++ .../core/PushbackSideInputDoFnRunnerTest.java | 235 +++ .../beam/runners/core/ReduceFnRunnerTest.java | 1446 ++++++++++++++++++ .../beam/runners/core/ReduceFnTester.java | 796 ++++++++++ .../beam/runners/core/SimpleDoFnRunnerTest.java | 88 ++ .../beam/sdk/util/BatchTimerInternalsTest.java | 117 -- .../sdk/util/GroupAlsoByWindowsProperties.java | 658 -------- ...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 109 -- .../util/LateDataDroppingDoFnRunnerTest.java | 114 -- .../util/PushbackSideInputDoFnRunnerTest.java | 231 --- .../beam/sdk/util/ReduceFnRunnerTest.java | 1442 ----------------- .../apache/beam/sdk/util/ReduceFnTester.java | 784 ---------- .../beam/sdk/util/SimpleDoFnRunnerTest.java | 84 - .../GroupAlsoByWindowEvaluatorFactory.java | 8 +- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 4 +- .../beam/runners/direct/ParDoEvaluator.java | 8 +- .../direct/UncommittedBundleOutputManager.java | 2 +- .../FlinkStreamingTransformTranslators.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 6 +- .../wrappers/streaming/WindowDoFnOperator.java | 2 +- .../apache/beam/runners/spark/SparkRunner.java | 2 +- .../spark/translation/TransformTranslator.java | 10 +- .../streaming/StreamingTransformTranslator.java | 2 +- .../src/main/resources/beam/findbugs-filter.xml | 2 +- .../org/apache/beam/sdk/util/BitSetCoder.java | 2 +- 75 files changed, 8395 insertions(+), 8332 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java new file mode 100644 index 0000000..f2387f5 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a + * {@link PCollection} to windows according to the provided {@link WindowFn}. + * + * @param Type of elements being windowed + * @param Window type + */ +public class AssignWindows + extends PTransform, PCollection> { + + private WindowFn fn; + + public AssignWindows(WindowFn fn) { + this.fn = fn; + } + + @Override + public PCollection apply(PCollection input) { + return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java new file mode 100644 index 0000000..0eb1667 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Iterables; +import java.util.Collection; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; + +/** + * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the + * provided {@link WindowFn}. + * + * @param Type of elements being windowed + * @param Window type + */ +@SystemDoFnInternal +public class AssignWindowsDoFn extends OldDoFn + implements RequiresWindowAccess { + private WindowFn fn; + + public AssignWindowsDoFn(WindowFn fn) { + this.fn = + checkNotNull( + fn, + "%s provided to %s cannot be null", + WindowFn.class.getSimpleName(), + AssignWindowsDoFn.class.getSimpleName()); + } + + @Override + @SuppressWarnings("unchecked") + public void processElement(final ProcessContext c) throws Exception { + Collection windows = + ((WindowFn) fn).assignWindows( + ((WindowFn) fn).new AssignContext() { + @Override + public T element() { + return c.element(); + } + + @Override + public Instant timestamp() { + return c.timestamp(); + } + + @Override + public BoundedWindow window() { + return Iterables.getOnlyElement(c.windowingInternals().windows()); + } + }); + + c.windowingInternals() + .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java new file mode 100644 index 0000000..829dbde --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; + +import org.joda.time.Instant; + +/** + * TimerInternals that uses priority queues to manage the timers that are ready to fire. + */ +public class BatchTimerInternals implements TimerInternals { + /** Set of timers that are scheduled used for deduplicating timers. */ + private Set existingTimers = new HashSet<>(); + + // Keep these queues separate so we can advance over them separately. + private PriorityQueue watermarkTimers = new PriorityQueue<>(11); + private PriorityQueue processingTimers = new PriorityQueue<>(11); + + private Instant inputWatermarkTime; + private Instant processingTime; + + private PriorityQueue queue(TimeDomain domain) { + return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers; + } + + public BatchTimerInternals(Instant processingTime) { + this.processingTime = processingTime; + this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + @Override + public void setTimer(TimerData timer) { + if (existingTimers.add(timer)) { + queue(timer.getDomain()).add(timer); + } + } + + @Override + public void deleteTimer(TimerData timer) { + existingTimers.remove(timer); + queue(timer.getDomain()).remove(timer); + } + + @Override + public Instant currentProcessingTime() { + return processingTime; + } + + /** + * {@inheritDoc} + * + * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing + * is already complete. + */ + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public Instant currentInputWatermarkTime() { + return inputWatermarkTime; + } + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { + // The output watermark is always undefined in batch mode. + return null; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("watermarkTimers", watermarkTimers) + .add("processingTimers", processingTimers) + .toString(); + } + + public void advanceInputWatermark(ReduceFnRunner runner, Instant newInputWatermark) + throws Exception { + checkState(!newInputWatermark.isBefore(inputWatermarkTime), + "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, + newInputWatermark); + inputWatermarkTime = newInputWatermark; + advance(runner, newInputWatermark, TimeDomain.EVENT_TIME); + } + + public void advanceProcessingTime(ReduceFnRunner runner, Instant newProcessingTime) + throws Exception { + checkState(!newProcessingTime.isBefore(processingTime), + "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime); + processingTime = newProcessingTime; + advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME); + } + + private void advance(ReduceFnRunner runner, Instant newTime, TimeDomain domain) + throws Exception { + PriorityQueue timers = queue(domain); + boolean shouldFire = false; + + do { + TimerData timer = timers.peek(); + // Timers fire if the new time is ahead of the timer + shouldFire = timer != null && newTime.isAfter(timer.getTimestamp()); + if (shouldFire) { + // Remove before firing, so that if the trigger adds another identical + // timer we don't remove it. + timers.remove(); + runner.onTimer(timer); + } + } while (shouldFire); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java new file mode 100644 index 0000000..f4c8eea --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; + +/** + * An wrapper interface that represents the execution of a {@link OldDoFn}. + */ +public interface DoFnRunner { + /** + * Prepares and calls {@link OldDoFn#startBundle}. + */ + public void startBundle(); + + /** + * Calls {@link OldDoFn#processElement} with a {@link ProcessContext} containing the current + * element. + */ + public void processElement(WindowedValue elem); + + /** + * Calls {@link OldDoFn#finishBundle} and performs additional tasks, such as + * flushing in-memory states. + */ + public void finishBundle(); + + /** + * An internal interface for signaling that a {@link OldDoFn} requires late data dropping. + */ + public interface ReduceFnExecutor { + /** + * Gets this object as a {@link OldDoFn}. + * + * Most implementors of this interface are expected to be {@link OldDoFn} instances, and will + * return themselves. + */ + OldDoFn, KV> asDoFn(); + + /** + * Returns an aggregator that tracks elements that are dropped due to being late. + */ + Aggregator getDroppedDueToLatenessAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java new file mode 100644 index 0000000..71472da --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.ExecutionContext.StepContext; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.joda.time.format.PeriodFormat; + +/** + * A base implementation of {@link DoFnRunner}. + * + *

Sub-classes should override {@link #invokeProcessElement}. + */ +public abstract class DoFnRunnerBase implements DoFnRunner { + + /** The {@link OldDoFn} being run. */ + public final OldDoFn fn; + + /** The context used for running the {@link OldDoFn}. */ + public final DoFnContext context; + + protected DoFnRunnerBase( + PipelineOptions options, + OldDoFn fn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag mainOutputTag, + List> sideOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy windowingStrategy) { + this.fn = fn; + this.context = new DoFnContext<>( + options, + fn, + sideInputReader, + outputManager, + mainOutputTag, + sideOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy == null ? null : windowingStrategy.getWindowFn()); + } + + /** + * An implementation of {@code OutputManager} using simple lists, for testing and in-memory + * contexts such as the {@code DirectRunner}. + */ + public static class ListOutputManager implements OutputManager { + + private Map, List>> outputLists = Maps.newHashMap(); + + @Override + public void output(TupleTag tag, WindowedValue output) { + @SuppressWarnings({"rawtypes", "unchecked"}) + List> outputList = (List) outputLists.get(tag); + + if (outputList == null) { + outputList = Lists.newArrayList(); + @SuppressWarnings({"rawtypes", "unchecked"}) + List> untypedList = (List) outputList; + outputLists.put(tag, untypedList); + } + + outputList.add(output); + } + + public List> getOutput(TupleTag tag) { + // Safe cast by design, inexpressible in Java without rawtypes + @SuppressWarnings({"rawtypes", "unchecked"}) + List> outputList = (List) outputLists.get(tag); + return (outputList != null) ? outputList : Collections.>emptyList(); + } + } + + @Override + public void startBundle() { + // This can contain user code. Wrap it in case it throws an exception. + try { + fn.startBundle(context); + } catch (Throwable t) { + // Exception in user code. + throw wrapUserCodeException(t); + } + } + + @Override + public void processElement(WindowedValue elem) { + if (elem.getWindows().size() <= 1 + || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass()) + && context.sideInputReader.isEmpty())) { + invokeProcessElement(elem); + } else { + // We could modify the windowed value (and the processContext) to + // avoid repeated allocations, but this is more straightforward. + for (WindowedValue windowedValue : elem.explodeWindows()) { + invokeProcessElement(windowedValue); + } + } + } + + /** + * Invokes {@link OldDoFn#processElement} after certain pre-processings has been done in + * {@link DoFnRunnerBase#processElement}. + */ + protected abstract void invokeProcessElement(WindowedValue elem); + + @Override + public void finishBundle() { + // This can contain user code. Wrap it in case it throws an exception. + try { + fn.finishBundle(context); + } catch (Throwable t) { + // Exception in user code. + throw wrapUserCodeException(t); + } + } + + /** + * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}. + * + * @param the type of the {@link OldDoFn} (main) input elements + * @param the type of the {@link OldDoFn} (main) output elements + */ + private static class DoFnContext + extends OldDoFn.Context { + private static final int MAX_SIDE_OUTPUTS = 1000; + + final PipelineOptions options; + final OldDoFn fn; + final SideInputReader sideInputReader; + final OutputManager outputManager; + final TupleTag mainOutputTag; + final StepContext stepContext; + final AggregatorFactory aggregatorFactory; + final WindowFn windowFn; + + /** + * The set of known output tags, some of which may be undeclared, so we can throw an + * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}. + */ + private Set> outputTags; + + public DoFnContext(PipelineOptions options, + OldDoFn fn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag mainOutputTag, + List> sideOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowFn windowFn) { + fn.super(); + this.options = options; + this.fn = fn; + this.sideInputReader = sideInputReader; + this.outputManager = outputManager; + this.mainOutputTag = mainOutputTag; + this.outputTags = Sets.newHashSet(); + + outputTags.add(mainOutputTag); + for (TupleTag sideOutputTag : sideOutputTags) { + outputTags.add(sideOutputTag); + } + + this.stepContext = stepContext; + this.aggregatorFactory = aggregatorFactory; + this.windowFn = windowFn; + super.setupDelegateAggregators(); + } + + ////////////////////////////////////////////////////////////////////////////// + + @Override + public PipelineOptions getPipelineOptions() { + return options; + } + + WindowedValue makeWindowedValue( + T output, Instant timestamp, Collection windows, PaneInfo pane) { + final Instant inputTimestamp = timestamp; + + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + if (windows == null) { + try { + // The windowFn can never succeed at accessing the element, so its type does not + // matter here + @SuppressWarnings("unchecked") + WindowFn objectWindowFn = (WindowFn) windowFn; + windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input element when none was available"); + } + + @Override + public Instant timestamp() { + if (inputTimestamp == null) { + throw new UnsupportedOperationException( + "WindowFn attempted to access input timestamp when none was available"); + } + return inputTimestamp; + } + + @Override + public W window() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input windows when none were available"); + } + }); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + + return WindowedValue.of(output, timestamp, windows, pane); + } + + public T sideInput(PCollectionView view, BoundedWindow mainInputWindow) { + if (!sideInputReader.contains(view)) { + throw new IllegalArgumentException("calling sideInput() with unknown view"); + } + BoundedWindow sideInputWindow = + view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); + return sideInputReader.get(view, sideInputWindow); + } + + void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo pane) { + outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane)); + } + + void outputWindowedValue(WindowedValue windowedElem) { + outputManager.output(mainOutputTag, windowedElem); + if (stepContext != null) { + stepContext.noteOutput(windowedElem); + } + } + + protected void sideOutputWindowedValue(TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo pane) { + sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); + } + + protected void sideOutputWindowedValue(TupleTag tag, WindowedValue windowedElem) { + if (!outputTags.contains(tag)) { + // This tag wasn't declared nor was it seen before during this execution. + // Thus, this must be a new, undeclared and unconsumed output. + // To prevent likely user errors, enforce the limit on the number of side + // outputs. + if (outputTags.size() >= MAX_SIDE_OUTPUTS) { + throw new IllegalArgumentException( + "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); + } + outputTags.add(tag); + } + + outputManager.output(tag, windowedElem); + if (stepContext != null) { + stepContext.noteSideOutput(tag, windowedElem); + } + } + + // Following implementations of output, outputWithTimestamp, and sideOutput + // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by + // ProcessContext's versions in OldDoFn.processElement. + @Override + public void output(OutputT output) { + outputWindowedValue(output, null, null, PaneInfo.NO_FIRING); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); + sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); + sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); + } + + @Override + protected Aggregator createAggregatorInternal( + String name, CombineFn combiner) { + checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null"); + return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner); + } + } + + /** + * Returns a new {@link OldDoFn.ProcessContext} for the given element. + */ + protected OldDoFn.ProcessContext createProcessContext( + WindowedValue elem) { + return new DoFnProcessContext(fn, context, elem); + } + + protected RuntimeException wrapUserCodeException(Throwable t) { + throw UserCodeException.wrapIf(!isSystemDoFn(), t); + } + + private boolean isSystemDoFn() { + return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class); + } + + /** + * A concrete implementation of {@link OldDoFn.ProcessContext} used for + * running a {@link OldDoFn} over a single element. + * + * @param the type of the {@link OldDoFn} (main) input elements + * @param the type of the {@link OldDoFn} (main) output elements + */ + static class DoFnProcessContext + extends OldDoFn.ProcessContext { + + + final OldDoFn fn; + final DoFnContext context; + final WindowedValue windowedValue; + + public DoFnProcessContext(OldDoFn fn, + DoFnContext context, + WindowedValue windowedValue) { + fn.super(); + this.fn = fn; + this.context = context; + this.windowedValue = windowedValue; + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + @Override + public InputT element() { + return windowedValue.getValue(); + } + + @Override + public T sideInput(PCollectionView view) { + checkNotNull(view, "View passed to sideInput cannot be null"); + Iterator windowIter = windows().iterator(); + BoundedWindow window; + if (!windowIter.hasNext()) { + if (context.windowFn instanceof GlobalWindows) { + // TODO: Remove this once GroupByKeyOnly no longer outputs elements + // without windows + window = GlobalWindow.INSTANCE; + } else { + throw new IllegalStateException( + "sideInput called when main input element is not in any windows"); + } + } else { + window = windowIter.next(); + if (windowIter.hasNext()) { + throw new IllegalStateException( + "sideInput called when main input element is in multiple windows"); + } + } + return context.sideInput(view, window); + } + + @Override + public BoundedWindow window() { + if (!(fn instanceof RequiresWindowAccess)) { + throw new UnsupportedOperationException( + "window() is only available in the context of a OldDoFn marked as" + + "RequiresWindowAccess."); + } + return Iterables.getOnlyElement(windows()); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public void output(OutputT output) { + context.outputWindowedValue(windowedValue.withValue(output)); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + checkTimestamp(timestamp); + context.outputWindowedValue(output, timestamp, + windowedValue.getWindows(), windowedValue.getPane()); + } + + void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo pane) { + context.outputWindowedValue(output, timestamp, windows, pane); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + checkNotNull(tag, "Tag passed to sideOutput cannot be null"); + context.sideOutputWindowedValue(tag, windowedValue.withValue(output)); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); + checkTimestamp(timestamp); + context.sideOutputWindowedValue( + tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); + } + + @Override + public Instant timestamp() { + return windowedValue.getTimestamp(); + } + + public Collection windows() { + return windowedValue.getWindows(); + } + + private void checkTimestamp(Instant timestamp) { + if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) { + throw new IllegalArgumentException(String.format( + "Cannot output with timestamp %s. Output timestamps must be no earlier than the " + + "timestamp of the current input (%s) minus the allowed skew (%s). See the " + + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", + timestamp, windowedValue.getTimestamp(), + PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()))); + } + } + + @Override + public WindowingInternals windowingInternals() { + return new WindowingInternals() { + @Override + public void outputWindowedValue(OutputT output, Instant timestamp, + Collection windows, PaneInfo pane) { + context.outputWindowedValue(output, timestamp, windows, pane); + } + + @Override + public Collection windows() { + return windowedValue.getWindows(); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public TimerInternals timerInternals() { + return context.stepContext.timerInternals(); + } + + @Override + public void writePCollectionViewData( + TupleTag tag, + Iterable> data, + Coder elemCoder) throws IOException { + @SuppressWarnings("unchecked") + Coder windowCoder = (Coder) context.windowFn.windowCoder(); + + context.stepContext.writePCollectionViewData( + tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)), + window(), windowCoder); + } + + @Override + public StateInternals stateInternals() { + return context.stepContext.stateInternals(); + } + + @Override + public T sideInput(PCollectionView view, BoundedWindow mainInputWindow) { + return context.sideInput(view, mainInputWindow); + } + }; + } + + @Override + protected Aggregator + createAggregatorInternal( + String name, CombineFn combiner) { + return context.createAggregatorInternal(name, combiner); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java new file mode 100644 index 0000000..7726374 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.List; + +import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.ExecutionContext.StepContext; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Static utility methods that provide {@link DoFnRunner} implementations. + */ +public class DoFnRunners { + /** + * Information about how to create output receivers and output to them. + */ + public interface OutputManager { + /** + * Outputs a single element to the receiver indicated by the given {@link TupleTag}. + */ + public void output(TupleTag tag, WindowedValue output); + } + + /** + * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}. + * + *

It invokes {@link OldDoFn#processElement} for each input. + */ + public static DoFnRunner simpleRunner( + PipelineOptions options, + OldDoFn fn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag mainOutputTag, + List> sideOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy windowingStrategy) { + return new SimpleDoFnRunner<>( + options, + fn, + sideInputReader, + outputManager, + mainOutputTag, + sideOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy); + } + + /** + * Returns an implementation of {@link DoFnRunner} that handles late data dropping. + * + *

It drops elements from expired windows before they reach the underlying {@link OldDoFn}. + */ + public static + DoFnRunner, KV> lateDataDroppingRunner( + PipelineOptions options, + ReduceFnExecutor reduceFnExecutor, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag> mainOutputTag, + List> sideOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy windowingStrategy) { + DoFnRunner, KV> simpleDoFnRunner = + simpleRunner( + options, + reduceFnExecutor.asDoFn(), + sideInputReader, + outputManager, + mainOutputTag, + sideOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy); + return new LateDataDroppingDoFnRunner<>( + simpleDoFnRunner, + windowingStrategy, + stepContext.timerInternals(), + reduceFnExecutor.getDroppedDueToLatenessAggregator()); + } + + + public static DoFnRunner createDefault( + PipelineOptions options, + OldDoFn doFn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag mainOutputTag, + List> sideOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy windowingStrategy) { + if (doFn instanceof ReduceFnExecutor) { + @SuppressWarnings("rawtypes") + ReduceFnExecutor fn = (ReduceFnExecutor) doFn; + @SuppressWarnings({"unchecked", "cast", "rawtypes"}) + DoFnRunner runner = (DoFnRunner) lateDataDroppingRunner( + options, + fn, + sideInputReader, + outputManager, + (TupleTag) mainOutputTag, + sideOutputTags, + stepContext, + aggregatorFactory, + (WindowingStrategy) windowingStrategy); + return runner; + } + return simpleRunner( + options, + doFn, + sideInputReader, + outputManager, + mainOutputTag, + sideOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java new file mode 100644 index 0000000..2380ba9 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; + +/** + * An interface for things that allow observing the size in bytes of + * encoded values of type {@code T}. + * + * @param the type of the values being observed + */ +public interface ElementByteSizeObservable { + /** + * Returns whether {@link #registerByteSizeObserver} is cheap enough + * to call for every element, that is, if this + * {@code ElementByteSizeObservable} can calculate the byte size of + * the element to be coded in roughly constant time (or lazily). + */ + public boolean isRegisterByteSizeObserverCheap(T value); + + /** + * Notifies the {@code ElementByteSizeObserver} about the byte size + * of the encoded value using this {@code ElementByteSizeObservable}. + */ + public void registerByteSizeObserver(T value, + ElementByteSizeObserver observer) + throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 7cdab00..b427037 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -17,16 +17,13 @@ */ package org.apache.beam.runners.core; +import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor; -import org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn; import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.ReduceFnRunner; import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.SystemReduceFn; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java new file mode 100644 index 0000000..9851449 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.values.KV; + +/** + * {@link OldDoFn} that merges windows and groups elements in those windows, optionally + * combining values. + * + * @param key type + * @param input value element type + * @param output value element type + * @param window type + */ +@SystemDoFnInternal +public abstract class GroupAlsoByWindowsDoFn + extends OldDoFn>>, KV> { + public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow"; + public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness"; + + protected final Aggregator droppedDueToClosedWindow = + createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn()); + protected final Aggregator droppedDueToLateness = + createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn()); + + /** + * Create the default {@link GroupAlsoByWindowsDoFn}, which uses window sets to implement the + * grouping. + * + * @param windowingStrategy The window function and trigger to use for grouping + * @param inputCoder the input coder to use + */ + public static + GroupAlsoByWindowsDoFn, W> createDefault( + WindowingStrategy windowingStrategy, + StateInternalsFactory stateInternalsFactory, + Coder inputCoder) { + return new GroupAlsoByWindowsViaOutputBufferDoFn<>( + windowingStrategy, stateInternalsFactory, SystemReduceFn.buffering(inputCoder)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java new file mode 100644 index 0000000..091ad33 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.collect.Iterables; +import java.util.List; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; + +/** + * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path" + * implementation is applicable. + */ +@SystemDoFnInternal +public class GroupAlsoByWindowsViaOutputBufferDoFn + extends GroupAlsoByWindowsDoFn { + + private final WindowingStrategy strategy; + private final StateInternalsFactory stateInternalsFactory; + private SystemReduceFn reduceFn; + + public GroupAlsoByWindowsViaOutputBufferDoFn( + WindowingStrategy windowingStrategy, + StateInternalsFactory stateInternalsFactory, + SystemReduceFn reduceFn) { + this.strategy = windowingStrategy; + this.reduceFn = reduceFn; + this.stateInternalsFactory = stateInternalsFactory; + } + + @Override + public void processElement( + OldDoFn>>, KV>.ProcessContext c) + throws Exception { + K key = c.element().getKey(); + // Used with Batch, we know that all the data is available for this key. We can't use the + // timer manager from the context because it doesn't exist. So we create one and emulate the + // watermark, knowing that we have all data and it is in timestamp order. + BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now()); + StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); + + ReduceFnRunner reduceFnRunner = + new ReduceFnRunner( + key, + strategy, + stateInternals, + timerInternals, + c.windowingInternals(), + droppedDueToClosedWindow, + reduceFn, + c.getPipelineOptions()); + + Iterable>> chunks = + Iterables.partition(c.element().getValue(), 1000); + for (Iterable> chunk : chunks) { + // Process the chunk of elements. + reduceFnRunner.processElements(chunk); + + // Then, since elements are sorted by their timestamp, advance the input watermark + // to the first element, and fire any timers that may have been scheduled. + timerInternals.advanceInputWatermark(reduceFnRunner, chunk.iterator().next().getTimestamp()); + + // Fire any processing timers that need to fire + timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now()); + + // Leave the output watermark undefined. Since there's no late data in batch mode + // there's really no need to track it as we do for streaming. + } + + // Finish any pending windows by advancing the input watermark to infinity. + timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE); + + // Finally, advance the processing time to infinity to fire any timers. + timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE); + + reduceFnRunner.persist(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java new file mode 100644 index 0000000..b521425 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.ReifyTimestampsAndWindows; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * An implementation of {@link GroupByKey} built on top of a lower-level {@link GroupByKeyOnly} + * primitive. + * + *

This implementation of {@link GroupByKey} proceeds via the following steps: + *

    + *
  1. {@code ReifyTimestampsAndWindowsDoFn ParDo(ReifyTimestampsAndWindows)}: This embeds + * the previously-implicit timestamp and window into the elements themselves, so a + * window-and-timestamp-unaware transform can operate on them.
  2. + *
  3. {@code GroupByKeyOnly}: This lower-level primitive groups by keys, ignoring windows + * and timestamps. Many window-unaware runners have such a primitive already.
  4. + *
  5. {@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables + * output by {@link GroupByKeyOnly} are sorted by timestamp.
  6. + *
  7. {@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is + * implemented as a {@link ParDo} that calls reserved internal methods.
  8. + *
+ * + *

This implementation of {@link GroupByKey} has severe limitations unless its component + * transforms are replaced. As-is, it is only applicable for in-memory runners using a batch-style + * execution strategy. Specifically: + * + *

    + *
  • Every iterable output by {@link GroupByKeyOnly} must contain all elements for that key. + * A streaming-style partition, with multiple elements for the same key, will not yield + * correct results.
  • + *
  • Sorting of values by timestamp is performed on an in-memory list. It will not succeed + * for large iterables.
  • + *
  • The implementation of {@code GroupAlsoByWindow} does not support timers. This is only + * appropriate for runners which also do not support timers.
  • + *
+ */ +public class GroupByKeyViaGroupByKeyOnly + extends PTransform>, PCollection>>> { + + private final GroupByKey gbkTransform; + + public GroupByKeyViaGroupByKeyOnly(GroupByKey originalTransform) { + this.gbkTransform = originalTransform; + } + + @Override + public PCollection>> apply(PCollection> input) { + WindowingStrategy windowingStrategy = input.getWindowingStrategy(); + + return input + // Make each input element's timestamp and assigned windows + // explicit, in the value part. + .apply(new ReifyTimestampsAndWindows()) + + // Group by just the key. + // Combiner lifting will not happen regardless of the disallowCombinerLifting value. + // There will be no combiners right after the GroupByKeyOnly because of the two ParDos + // introduced in here. + .apply(new GroupByKeyOnly>()) + + // Sort each key's values by timestamp. GroupAlsoByWindow requires + // its input to be sorted by timestamp. + .apply(new SortValuesByTimestamp()) + + // Group each key's values by window, merging windows as needed. + .apply(new GroupAlsoByWindow(windowingStrategy)) + + // And update the windowing strategy as appropriate. + .setWindowingStrategyInternal( + gbkTransform.updateWindowingStrategy(windowingStrategy)); + } + + /** + * Runner-specific primitive that groups by key only, ignoring any window assignments. A + * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate + * or evaluate this class. + */ + public static class GroupByKeyOnly + extends PTransform>, PCollection>>> { + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public PCollection>> apply(PCollection> input) { + return PCollection.>>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + } + + @Override + public Coder>> getDefaultOutputCoder(PCollection> input) { + return GroupByKey.getOutputKvCoder(input.getCoder()); + } + } + + /** + * Helper transform that sorts the values associated with each key by timestamp. + */ + private static class SortValuesByTimestamp + extends PTransform< + PCollection>>>, + PCollection>>>> { + @Override + public PCollection>>> apply( + PCollection>>> input) { + return input + .apply( + ParDo.of( + new OldDoFn< + KV>>, + KV>>>() { + @Override + public void processElement(ProcessContext c) { + KV>> kvs = c.element(); + K key = kvs.getKey(); + Iterable> unsortedValues = kvs.getValue(); + List> sortedValues = new ArrayList<>(); + for (WindowedValue value : unsortedValues) { + sortedValues.add(value); + } + Collections.sort( + sortedValues, + new Comparator>() { + @Override + public int compare(WindowedValue e1, WindowedValue e2) { + return e1.getTimestamp().compareTo(e2.getTimestamp()); + } + }); + c.output(KV.>>of(key, sortedValues)); + } + })) + .setCoder(input.getCoder()); + } + } + + /** + * Runner-specific primitive that takes a collection of timestamp-ordered values associated with + * each key, groups the values by window, merges windows as needed, and for each window in each + * key, outputs a collection of key/value-list pairs implicitly assigned to the window and with + * the timestamp derived from that window. + */ + public static class GroupAlsoByWindow + extends PTransform< + PCollection>>>, PCollection>>> { + private final WindowingStrategy windowingStrategy; + + public GroupAlsoByWindow(WindowingStrategy windowingStrategy) { + this.windowingStrategy = windowingStrategy; + } + + public WindowingStrategy getWindowingStrategy() { + return windowingStrategy; + } + + private KvCoder>> getKvCoder( + Coder>>> inputCoder) { + // Coder> --> KvCoder<...> + checkArgument(inputCoder instanceof KvCoder, + "%s requires a %s<...> but got %s", + getClass().getSimpleName(), + KvCoder.class.getSimpleName(), + inputCoder); + @SuppressWarnings("unchecked") + KvCoder>> kvCoder = + (KvCoder>>) inputCoder; + return kvCoder; + } + + public Coder getKeyCoder(Coder>>> inputCoder) { + return getKvCoder(inputCoder).getKeyCoder(); + } + + public Coder getValueCoder(Coder>>> inputCoder) { + // Coder> --> IterableCoder<...> + Coder>> iterableWindowedValueCoder = + getKvCoder(inputCoder).getValueCoder(); + checkArgument(iterableWindowedValueCoder instanceof IterableCoder, + "%s requires a %s<..., %s> but got a %s", + getClass().getSimpleName(), + KvCoder.class.getSimpleName(), + IterableCoder.class.getSimpleName(), + iterableWindowedValueCoder); + IterableCoder> iterableCoder = + (IterableCoder>) iterableWindowedValueCoder; + + // Coder> --> WindowedValueCoder<...> + Coder> iterableElementCoder = iterableCoder.getElemCoder(); + checkArgument(iterableElementCoder instanceof WindowedValueCoder, + "%s requires a %s<..., %s<%s>> but got a %s", + getClass().getSimpleName(), + KvCoder.class.getSimpleName(), + IterableCoder.class.getSimpleName(), + WindowedValueCoder.class.getSimpleName(), + iterableElementCoder); + WindowedValueCoder windowedValueCoder = + (WindowedValueCoder) iterableElementCoder; + + return windowedValueCoder.getValueCoder(); + } + + @Override + public PCollection>> apply( + PCollection>>> input) { + @SuppressWarnings("unchecked") + KvCoder>> inputKvCoder = + (KvCoder>>) input.getCoder(); + + Coder keyCoder = inputKvCoder.getKeyCoder(); + Coder>> inputValueCoder = inputKvCoder.getValueCoder(); + + IterableCoder> inputIterableValueCoder = + (IterableCoder>) inputValueCoder; + Coder> inputIterableElementCoder = inputIterableValueCoder.getElemCoder(); + WindowedValueCoder inputIterableWindowedValueCoder = + (WindowedValueCoder) inputIterableElementCoder; + + Coder inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder(); + Coder> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder); + Coder>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder); + + return PCollection.>>createPrimitiveOutputInternal( + input.getPipeline(), windowingStrategy, input.isBounded()) + .setCoder(outputKvCoder); + } + + private + GroupAlsoByWindowsViaOutputBufferDoFn, W> groupAlsoByWindowsFn( + WindowingStrategy strategy, + StateInternalsFactory stateInternalsFactory, + Coder inputIterableElementValueCoder) { + return new GroupAlsoByWindowsViaOutputBufferDoFn, W>( + strategy, + stateInternalsFactory, + SystemReduceFn.buffering(inputIterableElementValueCoder)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java new file mode 100644 index 0000000..63a80d2 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItems; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; + +/** + * A customized {@link DoFnRunner} that handles late data dropping for + * a {@link KeyedWorkItem} input {@link OldDoFn}. + * + *

It expands windows before checking data lateness. + * + *

{@link KeyedWorkItem KeyedWorkItems} are always in empty windows. + * + * @param key type + * @param input value element type + * @param output value element type + * @param window type + */ +public class LateDataDroppingDoFnRunner + implements DoFnRunner, KV> { + private final DoFnRunner, KV> doFnRunner; + private final LateDataFilter lateDataFilter; + + public LateDataDroppingDoFnRunner( + DoFnRunner, KV> doFnRunner, + WindowingStrategy windowingStrategy, + TimerInternals timerInternals, + Aggregator droppedDueToLateness) { + this.doFnRunner = doFnRunner; + lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, droppedDueToLateness); + } + + @Override + public void startBundle() { + doFnRunner.startBundle(); + } + + @Override + public void processElement(WindowedValue> elem) { + Iterable> nonLateElements = lateDataFilter.filter( + elem.getValue().key(), elem.getValue().elementsIterable()); + KeyedWorkItem keyedWorkItem = KeyedWorkItems.workItem( + elem.getValue().key(), elem.getValue().timersIterable(), nonLateElements); + doFnRunner.processElement(elem.withValue(keyedWorkItem)); + } + + @Override + public void finishBundle() { + doFnRunner.finishBundle(); + } + + /** + * It filters late data in a {@link KeyedWorkItem}. + */ + @VisibleForTesting + static class LateDataFilter { + private final WindowingStrategy windowingStrategy; + private final TimerInternals timerInternals; + private final Aggregator droppedDueToLateness; + + public LateDataFilter( + WindowingStrategy windowingStrategy, + TimerInternals timerInternals, + Aggregator droppedDueToLateness) { + this.windowingStrategy = windowingStrategy; + this.timerInternals = timerInternals; + this.droppedDueToLateness = droppedDueToLateness; + } + + /** + * Returns an {@code Iterable>} that only contains + * non-late input elements. + */ + public Iterable> filter( + final K key, Iterable> elements) { + Iterable>> windowsExpandedElements = Iterables.transform( + elements, + new Function, Iterable>>() { + @Override + public Iterable> apply(final WindowedValue input) { + return Iterables.transform( + input.getWindows(), + new Function>() { + @Override + public WindowedValue apply(BoundedWindow window) { + return WindowedValue.of( + input.getValue(), input.getTimestamp(), window, input.getPane()); + } + }); + }}); + + Iterable> nonLateElements = Iterables.filter( + Iterables.concat(windowsExpandedElements), + new Predicate>() { + @Override + public boolean apply(WindowedValue input) { + BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); + if (canDropDueToExpiredWindow(window)) { + // The element is too late for this window. + droppedDueToLateness.addValue(1L); + WindowTracing.debug( + "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " + + "since too far behind inputWatermark:{}; outputWatermark:{}", + input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + return false; + } else { + return true; + } + } + }); + return nonLateElements; + } + + /** Is {@code window} expired w.r.t. the garbage collection watermark? */ + private boolean canDropDueToExpiredWindow(BoundedWindow window) { + Instant inputWM = timerInternals.currentInputWatermarkTime(); + return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java new file mode 100644 index 0000000..3e51dfb --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateMerging; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; + +/** + * Tracks which windows have non-empty panes. Specifically, which windows have new elements since + * their last triggering. + * + * @param The kind of windows being tracked. + */ +public abstract class NonEmptyPanes { + + static NonEmptyPanes create( + WindowingStrategy strategy, ReduceFn reduceFn) { + if (strategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) { + return new DiscardingModeNonEmptyPanes<>(reduceFn); + } else { + return new GeneralNonEmptyPanes<>(); + } + } + + /** + * Record that some content has been added to the window in {@code context}, and therefore the + * current pane is not empty. + */ + public abstract void recordContent(StateAccessor context); + + /** + * Record that the given pane is empty. + */ + public abstract void clearPane(StateAccessor state); + + /** + * Return true if the current pane for the window in {@code context} is empty. + */ + public abstract ReadableState isEmpty(StateAccessor context); + + /** + * Prefetch in preparation for merging. + */ + public abstract void prefetchOnMerge(MergingStateAccessor state); + + /** + * Eagerly merge backing state. + */ + public abstract void onMerge(MergingStateAccessor context); + + /** + * An implementation of {@code NonEmptyPanes} optimized for use with discarding mode. Uses the + * presence of data in the accumulation buffer to record non-empty panes. + */ + private static class DiscardingModeNonEmptyPanes + extends NonEmptyPanes { + + private ReduceFn reduceFn; + + private DiscardingModeNonEmptyPanes(ReduceFn reduceFn) { + this.reduceFn = reduceFn; + } + + @Override + public ReadableState isEmpty(StateAccessor state) { + return reduceFn.isEmpty(state); + } + + @Override + public void recordContent(StateAccessor state) { + // Nothing to do -- the reduceFn is tracking contents + } + + @Override + public void clearPane(StateAccessor state) { + // Nothing to do -- the reduceFn is tracking contents + } + + @Override + public void prefetchOnMerge(MergingStateAccessor state) { + // Nothing to do -- the reduceFn is tracking contents + } + + @Override + public void onMerge(MergingStateAccessor context) { + // Nothing to do -- the reduceFn is tracking contents + } + } + + /** + * An implementation of {@code NonEmptyPanes} for general use. + */ + private static class GeneralNonEmptyPanes + extends NonEmptyPanes { + + private static final StateTag> + PANE_ADDITIONS_TAG = + StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( + "count", VarLongCoder.of(), new Sum.SumLongFn())); + + @Override + public void recordContent(StateAccessor state) { + state.access(PANE_ADDITIONS_TAG).add(1L); + } + + @Override + public void clearPane(StateAccessor state) { + state.access(PANE_ADDITIONS_TAG).clear(); + } + + @Override + public ReadableState isEmpty(StateAccessor state) { + return state.access(PANE_ADDITIONS_TAG).isEmpty(); + } + + @Override + public void prefetchOnMerge(MergingStateAccessor state) { + StateMerging.prefetchCombiningValues(state, PANE_ADDITIONS_TAG); + } + + @Override + public void onMerge(MergingStateAccessor context) { + StateMerging.mergeCombiningValues(context, PANE_ADDITIONS_TAG); + } + } +}