Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ADD64200BC5 for ; Tue, 8 Nov 2016 00:48:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AC59B160AF9; Mon, 7 Nov 2016 23:48:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A539F160AEC for ; Tue, 8 Nov 2016 00:48:37 +0100 (CET) Received: (qmail 44978 invoked by uid 500); 7 Nov 2016 23:48:36 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 44969 invoked by uid 99); 7 Nov 2016 23:48:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Nov 2016 23:48:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 0B72AC00B6 for ; Mon, 7 Nov 2016 23:48:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Fj36BL7tN0BH for ; Mon, 7 Nov 2016 23:48:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id AFC0E5F1B8 for ; Mon, 7 Nov 2016 23:48:31 +0000 (UTC) Received: (qmail 44720 invoked by uid 99); 7 Nov 2016 23:48:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Nov 2016 23:48:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D267AEC22D; Mon, 7 Nov 2016 23:48:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgroh@apache.org To: commits@beam.incubator.apache.org Date: Mon, 07 Nov 2016 23:48:31 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] incubator-beam git commit: Track Minimum Element Timestamps within Bundles archived-at: Mon, 07 Nov 2016 23:48:38 -0000 Track Minimum Element Timestamps within Bundles This allows the Watermark Manager to track pending elements by bundles of elements rather than per-element, which significantly reduces the amount of work done per-element to track watermarks. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a58f1eba Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a58f1eba Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a58f1eba Branch: refs/heads/master Commit: a58f1eba1a3b5ae453f4d65ad409785cb717b2ae Parents: 317b5e6 Author: Thomas Groh Authored: Fri Nov 4 12:54:00 2016 -0700 Committer: Thomas Groh Committed: Mon Nov 7 15:47:02 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 5 ++ .../direct/ImmutableListBundleFactory.java | 21 ++++++++- .../beam/runners/direct/WatermarkManager.java | 48 +++++++++----------- .../direct/ImmutableListBundleFactoryTest.java | 15 +++++- 4 files changed, 60 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 44d1986..4d5a449 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -151,6 +151,11 @@ public class DirectRunner Iterable> getElements(); /** + * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}. + */ + Instant getMinTimestamp(); + + /** * Returns the processing time output watermark at the time the producing {@link PTransform} * committed this bundle. Downstream synchronized processing time watermarks cannot progress * past this point before consuming this bundle. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java index abc6dd8..6b342d6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Instant; @@ -64,6 +65,7 @@ class ImmutableListBundleFactory implements BundleFactory { private final StructuralKey key; private boolean committed = false; private ImmutableList.Builder> elements; + private Instant minSoFar = BoundedWindow.TIMESTAMP_MAX_VALUE; /** * Create a new {@link UncommittedImmutableListBundle} for the specified {@link PCollection}. @@ -93,6 +95,9 @@ class ImmutableListBundleFactory implements BundleFactory { element, pcollection); elements.add(element); + if (element.getTimestamp().isBefore(minSoFar)) { + minSoFar = element.getTimestamp(); + } return this; } @@ -102,7 +107,7 @@ class ImmutableListBundleFactory implements BundleFactory { committed = true; final Iterable> committedElements = elements.build(); return CommittedImmutableListBundle.create( - pcollection, key, committedElements, synchronizedCompletionTime); + pcollection, key, committedElements, minSoFar, synchronizedCompletionTime); } } @@ -112,9 +117,10 @@ class ImmutableListBundleFactory implements BundleFactory { @Nullable PCollection pcollection, StructuralKey key, Iterable> committedElements, + Instant minElementTimestamp, Instant synchronizedCompletionTime) { return new AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle<>( - pcollection, key, committedElements, synchronizedCompletionTime); + pcollection, key, committedElements, minElementTimestamp, synchronizedCompletionTime); } @Override @@ -123,6 +129,7 @@ class ImmutableListBundleFactory implements BundleFactory { getPCollection(), getKey(), ImmutableList.copyOf(elements), + minTimestamp(elements), getSynchronizedProcessingOutputWatermark()); } @@ -136,4 +143,14 @@ class ImmutableListBundleFactory implements BundleFactory { return this == obj; } } + + private static Instant minTimestamp(Iterable> elements) { + Instant minTs = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (WindowedValue element : elements) { + if (element.getTimestamp().isBefore(minTs)) { + minTs = element.getTimestamp(); + } + } + return minTs; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 2228cd5..f235af0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -27,8 +27,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; -import com.google.common.collect.SortedMultiset; -import com.google.common.collect.TreeMultiset; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -55,7 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.joda.time.Instant; @@ -123,6 +120,10 @@ import org.joda.time.Instant; * */ public class WatermarkManager { + // The number of updates to apply in #tryApplyPendingUpdates + private static final int MAX_INCREMENTAL_UPDATES = 10; + + /** * The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a * {@link PCollection}. @@ -203,7 +204,7 @@ public class WatermarkManager { */ private static class AppliedPTransformInputWatermark implements Watermark { private final Collection inputWatermarks; - private final SortedMultiset> pendingElements; + private final NavigableSet> pendingElements; private final Map, NavigableSet> objectTimers; private AtomicReference currentWatermark; @@ -213,10 +214,10 @@ public class WatermarkManager { // The ordering must order elements by timestamp, and must not compare two distinct elements // as equal. This is built on the assumption that any element added as a pending element will // be consumed without modifications. - Ordering> pendingElementComparator = - new WindowedValueByTimestampComparator().compound(Ordering.arbitrary()); + Ordering> pendingBundleComparator = + new BundleByElementTimestampComparator().compound(Ordering.arbitrary()); this.pendingElements = - TreeMultiset.create(pendingElementComparator); + new TreeSet<>(pendingBundleComparator); this.objectTimers = new HashMap<>(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); } @@ -249,25 +250,20 @@ public class WatermarkManager { minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get()); } if (!pendingElements.isEmpty()) { - minInputWatermark = INSTANT_ORDERING.min( - minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp()); + minInputWatermark = + INSTANT_ORDERING.min(minInputWatermark, pendingElements.first().getMinTimestamp()); } Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark); currentWatermark.set(newWatermark); return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark); } - private synchronized void addPendingElements(Iterable> newPending) { - for (WindowedValue pendingElement : newPending) { - pendingElements.add(pendingElement); - } + private synchronized void addPending(CommittedBundle newPending) { + pendingElements.add(newPending); } - private synchronized void removePendingElements( - Iterable> finishedElements) { - for (WindowedValue finishedElement : finishedElements) { - pendingElements.remove(finishedElement); - } + private synchronized void removePending(CommittedBundle completed) { + pendingElements.remove(completed); } private synchronized void updateTimers(TimerUpdate update) { @@ -856,7 +852,7 @@ public class WatermarkManager { private void tryApplyPendingUpdates() { if (refreshLock.tryLock()) { try { - applyNUpdates(10); + applyNUpdates(MAX_INCREMENTAL_UPDATES); } finally { refreshLock.unlock(); } @@ -867,7 +863,7 @@ public class WatermarkManager { * Applies all pending updates to this {@link WatermarkManager}, causing the pending state * of all {@link TransformWatermarks} to be advanced as far as possible. */ - private void applyPendingUpdates() { + private void applyAllPendingUpdates() { refreshLock.lock(); try { applyNUpdates(-1); @@ -944,7 +940,7 @@ public class WatermarkManager { synchronized void refreshAll() { refreshLock.lock(); try { - applyPendingUpdates(); + applyAllPendingUpdates(); Set> toRefresh = pendingRefreshes; while (!toRefresh.isEmpty()) { toRefresh = refreshAllOf(toRefresh); @@ -1180,12 +1176,12 @@ public class WatermarkManager { } private void removePending(CommittedBundle bundle) { - inputWatermark.removePendingElements(bundle.getElements()); + inputWatermark.removePending(bundle); synchronizedProcessingInputWatermark.removePending(bundle); } private void addPending(CommittedBundle bundle) { - inputWatermark.addPendingElements(bundle.getElements()); + inputWatermark.addPending(bundle); synchronizedProcessingInputWatermark.addPending(bundle); } @@ -1434,11 +1430,11 @@ public class WatermarkManager { } } - private static class WindowedValueByTimestampComparator extends Ordering> { + private static class BundleByElementTimestampComparator extends Ordering> { @Override - public int compare(WindowedValue o1, WindowedValue o2) { + public int compare(CommittedBundle o1, CommittedBundle o2) { return ComparisonChain.start() - .compare(o1.getTimestamp(), o2.getTimestamp()) + .compare(o1.getMinTimestamp(), o2.getMinTimestamp()) .result(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java index 4a7477f..a36c408 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; @@ -35,6 +36,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -95,15 +97,25 @@ public class ImmutableListBundleFactoryTest { afterCommitGetElementsShouldHaveAddedElements(Iterable> elems) { UncommittedBundle bundle = bundleFactory.createRootBundle(); Collection>> expectations = new ArrayList<>(); + Instant minElementTs = BoundedWindow.TIMESTAMP_MAX_VALUE; for (WindowedValue elem : elems) { bundle.add(elem); expectations.add(equalTo(elem)); + if (elem.getTimestamp().isBefore(minElementTs)) { + minElementTs = elem.getTimestamp(); + } } Matcher>> containsMatcher = Matchers.>containsInAnyOrder(expectations); - CommittedBundle committed = bundle.commit(Instant.now()); + Instant commitTime = Instant.now(); + CommittedBundle committed = bundle.commit(commitTime); assertThat(committed.getElements(), containsMatcher); + // Sanity check that the test is meaningful. + assertThat(minElementTs, not(equalTo(commitTime))); + assertThat(committed.getMinTimestamp(), equalTo(minElementTs)); + assertThat(committed.getSynchronizedProcessingOutputWatermark(), equalTo(commitTime)); + return committed; } @@ -149,6 +161,7 @@ public class ImmutableListBundleFactoryTest { assertThat( withed.getSynchronizedProcessingOutputWatermark(), equalTo(committed.getSynchronizedProcessingOutputWatermark())); + assertThat(withed.getMinTimestamp(), equalTo(new Instant(2048L))); } @Test