Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 391EC200BB6 for ; Fri, 4 Nov 2016 21:06:37 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 37996160AFE; Fri, 4 Nov 2016 20:06:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D8C6D160AEA for ; Fri, 4 Nov 2016 21:06:35 +0100 (CET) Received: (qmail 27073 invoked by uid 500); 4 Nov 2016 20:06:35 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 27064 invoked by uid 99); 4 Nov 2016 20:06:35 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Nov 2016 20:06:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9358DC1444 for ; Fri, 4 Nov 2016 20:06:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 4MezGIMS1ofB for ; Fri, 4 Nov 2016 20:06:30 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 06BC85F1E9 for ; Fri, 4 Nov 2016 20:06:27 +0000 (UTC) Received: (qmail 26911 invoked by uid 99); 4 Nov 2016 20:06:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Nov 2016 20:06:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1D1BEDFE5C; Fri, 4 Nov 2016 20:06:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgroh@apache.org To: commits@beam.incubator.apache.org Date: Fri, 04 Nov 2016 20:06:27 -0000 Message-Id: <7e2ad8f9d19a4ffc915fd84717146bf5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-beam git commit: Flatten FiredTimers and ExtractFiredTimers archived-at: Fri, 04 Nov 2016 20:06:37 -0000 Flatten FiredTimers and ExtractFiredTimers Pass a single collection of fired timers, and have those objects contain the associated transform and key that they fired for. Timers already contain the domain they are in. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5dca2674 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5dca2674 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5dca2674 Branch: refs/heads/master Commit: 5dca2674a8d145c6e619005c2282c6064cd7aab7 Parents: 6e1e57b Author: Thomas Groh Authored: Thu Nov 3 14:10:37 2016 -0700 Committer: Thomas Groh Committed: Fri Nov 4 13:05:21 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/EvaluationContext.java | 6 +- .../direct/ExecutorServiceParallelExecutor.java | 41 ++++---- .../beam/runners/direct/WatermarkManager.java | 79 ++++++++------ .../runners/direct/EvaluationContextTest.java | 23 ++--- .../runners/direct/WatermarkManagerTest.java | 102 ++++++------------- 5 files changed, 109 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 965e77d..b814def 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -391,11 +391,9 @@ class EvaluationContext { *

This is a destructive operation. Timers will only appear in the result of this method once * for each time they are set. */ - public Map, Map, FiredTimers>> extractFiredTimers() { + public Collection extractFiredTimers() { forceRefresh(); - Map, Map, FiredTimers>> fired = - watermarkManager.extractFiredTimers(); - return fired; + return watermarkManager.extractFiredTimers(); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index e32f671..d1ffea1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -440,29 +439,23 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { */ private void fireTimers() throws Exception { try { - for (Map.Entry< - AppliedPTransform, Map, FiredTimers>> transformTimers : - evaluationContext.extractFiredTimers().entrySet()) { - AppliedPTransform transform = transformTimers.getKey(); - for (Map.Entry, FiredTimers> keyTimers : - transformTimers.getValue().entrySet()) { - for (TimeDomain domain : TimeDomain.values()) { - Collection delivery = keyTimers.getValue().getTimers(domain); - if (delivery.isEmpty()) { - continue; - } - KeyedWorkItem work = - KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), delivery); - @SuppressWarnings({"unchecked", "rawtypes"}) - CommittedBundle bundle = - evaluationContext - .createKeyedBundle(keyTimers.getKey(), (PCollection) transform.getInput()) - .add(WindowedValue.valueInGlobalWindow(work)) - .commit(evaluationContext.now()); - scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery)); - state.set(ExecutorState.ACTIVE); - } - } + for (FiredTimers transformTimers : evaluationContext.extractFiredTimers()) { + Collection delivery = transformTimers.getTimers(); + KeyedWorkItem work = + KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery); + @SuppressWarnings({"unchecked", "rawtypes"}) + CommittedBundle bundle = + evaluationContext + .createKeyedBundle( + transformTimers.getKey(), + (PCollection) transformTimers.getTransform().getInput()) + .add(WindowedValue.valueInGlobalWindow(work)) + .commit(evaluationContext.now()); + scheduleConsumption( + transformTimers.getTransform(), + bundle, + new TimerIterableCompletionCallback(delivery)); + state.set(ExecutorState.ACTIVE); } } catch (Exception e) { LOG.error("Internal Error while delivering timers", e); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 31b8091..f01c13c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -740,14 +740,17 @@ public class WatermarkManager { wms = new TransformWatermarks( - inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark); + transform, + inputWatermark, + outputWatermark, + inputProcessingWatermark, + outputProcessingWatermark); transformToWatermarks.put(transform, wms); } return wms; } - private Collection getInputProcessingWatermarks( - AppliedPTransform transform) { + private Collection getInputProcessingWatermarks(AppliedPTransform transform) { ImmutableList.Builder inputWmsBuilder = ImmutableList.builder(); Collection inputs = transform.getInput().expand(); if (inputs.isEmpty()) { @@ -924,15 +927,12 @@ public class WatermarkManager { * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the * pending timers will be removed from this {@link WatermarkManager}. */ - public Map, Map, FiredTimers>> extractFiredTimers() { - Map, Map, FiredTimers>> allTimers = new HashMap<>(); + public Collection extractFiredTimers() { + Collection allTimers = new ArrayList<>(); for (Map.Entry, TransformWatermarks> watermarksEntry : transformToWatermarks.entrySet()) { - Map, FiredTimers> keyFiredTimers = - watermarksEntry.getValue().extractFiredTimers(); - if (!keyFiredTimers.isEmpty()) { - allTimers.put(watermarksEntry.getKey(), keyFiredTimers); - } + Collection firedTimers = watermarksEntry.getValue().extractFiredTimers(); + allTimers.addAll(firedTimers); } return allTimers; } @@ -1043,6 +1043,8 @@ public class WatermarkManager { * A reference to the input and output watermarks of an {@link AppliedPTransform}. */ public class TransformWatermarks { + private final AppliedPTransform transform; + private final AppliedPTransformInputWatermark inputWatermark; private final AppliedPTransformOutputWatermark outputWatermark; @@ -1053,10 +1055,12 @@ public class WatermarkManager { private Instant latestSynchronizedOutputWm; private TransformWatermarks( + AppliedPTransform transform, AppliedPTransformInputWatermark inputWatermark, AppliedPTransformOutputWatermark outputWatermark, SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark, SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) { + this.transform = transform; this.inputWatermark = inputWatermark; this.outputWatermark = outputWatermark; @@ -1128,7 +1132,7 @@ public class WatermarkManager { synchronizedProcessingInputWatermark.addPending(bundle); } - private Map, FiredTimers> extractFiredTimers() { + private Collection extractFiredTimers() { Map, List> eventTimeTimers = inputWatermark.extractFiredEventTimeTimers(); Map, List> processingTimers; @@ -1137,31 +1141,33 @@ public class WatermarkManager { TimeDomain.PROCESSING_TIME, clock.now()); synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime()); - Map, Map>> groupedTimers = new HashMap<>(); - groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers); - Map, FiredTimers> keyFiredTimers = new HashMap<>(); - for (Map.Entry, Map>> firedTimers : - groupedTimers.entrySet()) { - keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue())); + Map, List> timersPerKey = + groupFiredTimers(eventTimeTimers, processingTimers, synchronizedTimers); + Collection keyFiredTimers = new ArrayList<>(timersPerKey.size()); + for (Map.Entry, List> firedTimers : + timersPerKey.entrySet()) { + keyFiredTimers.add( + new FiredTimers(transform, firedTimers.getKey(), firedTimers.getValue())); } return keyFiredTimers; } @SafeVarargs - private final void groupFiredTimers( - Map, Map>> groupedToMutate, + private final Map, List> groupFiredTimers( Map, List>... timersToGroup) { + Map, List> groupedTimers = new HashMap<>(); for (Map, List> subGroup : timersToGroup) { for (Map.Entry, List> newTimers : subGroup.entrySet()) { - Map> grouped = groupedToMutate.get(newTimers.getKey()); + List grouped = groupedTimers.get(newTimers.getKey()); if (grouped == null) { - grouped = new HashMap<>(); - groupedToMutate.put(newTimers.getKey(), grouped); + grouped = new ArrayList<>(); + groupedTimers.put(newTimers.getKey(), grouped); } - grouped.put(newTimers.getValue().get(0).getDomain(), newTimers.getValue()); + grouped.addAll(newTimers.getValue()); } } + return groupedTimers; } private void updateTimers(TimerUpdate update) { @@ -1334,24 +1340,35 @@ public class WatermarkManager { * {@link WatermarkManager}. */ public static class FiredTimers { - private final Map> timers; + /** The transform the timers were set at and will be delivered to. */ + private final AppliedPTransform transform; + /** The key the timers were set for and will be delivered to. */ + private final StructuralKey key; + private final Collection timers; - private FiredTimers(Map> timers) { + private FiredTimers( + AppliedPTransform transform, StructuralKey key, Collection timers) { + this.transform = transform; + this.key = key; this.timers = timers; } + public AppliedPTransform getTransform() { + return transform; + } + + public StructuralKey getKey() { + return key; + } + /** * Gets all of the timers that have fired within the provided {@link TimeDomain}. If no timers * fired within the provided domain, return an empty collection. * *

Timers within a {@link TimeDomain} are guaranteed to be in order of increasing timestamp. */ - public Collection getTimers(TimeDomain domain) { - Collection domainTimers = timers.get(domain); - if (domainTimers == null) { - return Collections.emptyList(); - } - return domainTimers; + public Collection getTimers() { + return timers; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index bc53570..e1277ac 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -373,36 +373,31 @@ public class EvaluationContextTest { .build(); // haven't added any timers, must be empty - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + assertThat(context.extractFiredTimers(), emptyIterable()); context.handleResult( context.createKeyedBundle(key, created).commit(Instant.now()), ImmutableList.of(), timerResult); // timer hasn't fired - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + assertThat(context.extractFiredTimers(), emptyIterable()); TransformResult advanceResult = StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); // Should cause the downstream timer to fire context.handleResult(null, ImmutableList.of(), advanceResult); - Map, Map, FiredTimers>> fired = - context.extractFiredTimers(); + Collection fired = context.extractFiredTimers(); assertThat( - fired, - Matchers.>hasKey(downstream.getProducingTransformInternal())); - Map, FiredTimers> downstreamFired = - fired.get(downstream.getProducingTransformInternal()); - assertThat(downstreamFired, Matchers.hasKey(key)); + Iterables.getOnlyElement(fired).getKey(), + Matchers.>equalTo(key)); - FiredTimers firedForKey = downstreamFired.get(key); - assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable()); - assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable()); - assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire)); + FiredTimers firedForKey = Iterables.getOnlyElement(fired); + // Contains exclusively the fired timer + assertThat(firedForKey.getTimers(), contains(toFire)); // Don't reextract timers - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + assertThat(context.extractFiredTimers(), emptyIterable()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 1954005..6bde462 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; @@ -68,6 +67,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.hamcrest.Matcher; +import org.hamcrest.Matchers; import org.joda.time.Instant; import org.joda.time.ReadableInstant; import org.junit.Before; @@ -915,12 +915,9 @@ public class WatermarkManagerTest implements Serializable { filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(earlierThan(initialFilteredDoubledWm))); - Map, Map, FiredTimers>> firedTimers = - manager.extractFiredTimers(); + Collection firedTimers = manager.extractFiredTimers(); assertThat( - firedTimers.get(filtered.getProducingTransformInternal()) - .get(key) - .getTimers(TimeDomain.PROCESSING_TIME), + Iterables.getOnlyElement(firedTimers).getTimers(), contains(pastTimer)); // Our timer has fired, but has not been completed, so it holds our synchronized processing WM assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime))); @@ -1099,10 +1096,9 @@ public class WatermarkManagerTest implements Serializable { @Test public void extractFiredTimersReturnsFiredEventTimeTimers() { - Map, Map, FiredTimers>> initialTimers = - manager.extractFiredTimers(); + Collection initialTimers = manager.extractFiredTimers(); // Watermarks haven't advanced - assertThat(initialTimers.entrySet(), emptyIterable()); + assertThat(initialTimers, emptyIterable()); // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); @@ -1136,15 +1132,11 @@ public class WatermarkManagerTest implements Serializable { new Instant(1000L)); manager.refreshAll(); - Map, Map, FiredTimers>> firstTransformFiredTimers = + Collection firstFiredTimers = manager.extractFiredTimers(); - assertThat( - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map, FiredTimers> firstFilteredTimers = - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(firstFilteredTimers.get(key), not(nullValue())); - FiredTimers firstFired = firstFilteredTimers.get(key); - assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), contains(earliestTimer)); + assertThat(firstFiredTimers, not(Matchers.emptyIterable())); + FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers); + assertThat(firstFired.getTimers(), contains(earliestTimer)); manager.updateWatermarks(null, TimerUpdate.empty(), @@ -1153,24 +1145,18 @@ public class WatermarkManagerTest implements Serializable { Collections.>emptyList()), new Instant(50_000L)); manager.refreshAll(); - Map, Map, FiredTimers>> secondTransformFiredTimers = - manager.extractFiredTimers(); - assertThat( - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map, FiredTimers> secondFilteredTimers = - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(secondFilteredTimers.get(key), not(nullValue())); - FiredTimers secondFired = secondFilteredTimers.get(key); + Collection secondFiredTimers = manager.extractFiredTimers(); + assertThat(secondFiredTimers, not(Matchers.emptyIterable())); + FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers); // Contains, in order, middleTimer and then lastTimer - assertThat(secondFired.getTimers(TimeDomain.EVENT_TIME), contains(middleTimer, lastTimer)); + assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer)); } @Test public void extractFiredTimersReturnsFiredProcessingTimeTimers() { - Map, Map, FiredTimers>> initialTimers = - manager.extractFiredTimers(); + Collection initialTimers = manager.extractFiredTimers(); // Watermarks haven't advanced - assertThat(initialTimers.entrySet(), emptyIterable()); + assertThat(initialTimers, emptyIterable()); // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); @@ -1204,15 +1190,10 @@ public class WatermarkManagerTest implements Serializable { new Instant(1000L)); manager.refreshAll(); - Map, Map, FiredTimers>> firstTransformFiredTimers = - manager.extractFiredTimers(); - assertThat( - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map, FiredTimers> firstFilteredTimers = - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(firstFilteredTimers.get(key), not(nullValue())); - FiredTimers firstFired = firstFilteredTimers.get(key); - assertThat(firstFired.getTimers(TimeDomain.PROCESSING_TIME), contains(earliestTimer)); + Collection firstFiredTimers = manager.extractFiredTimers(); + assertThat(firstFiredTimers, not(Matchers.emptyIterable())); + FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers); + assertThat(firstFired.getTimers(), contains(earliestTimer)); clock.set(new Instant(50_000L)); manager.updateWatermarks(null, @@ -1222,24 +1203,19 @@ public class WatermarkManagerTest implements Serializable { Collections.>emptyList()), new Instant(50_000L)); manager.refreshAll(); - Map, Map, FiredTimers>> secondTransformFiredTimers = + Collection secondFiredTimers = manager.extractFiredTimers(); - assertThat( - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map, FiredTimers> secondFilteredTimers = - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(secondFilteredTimers.get(key), not(nullValue())); - FiredTimers secondFired = secondFilteredTimers.get(key); + assertThat(secondFiredTimers, not(Matchers.emptyIterable())); + FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers); // Contains, in order, middleTimer and then lastTimer - assertThat(secondFired.getTimers(TimeDomain.PROCESSING_TIME), contains(middleTimer, lastTimer)); + assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer)); } @Test public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { - Map, Map, FiredTimers>> initialTimers = - manager.extractFiredTimers(); + Collection initialTimers = manager.extractFiredTimers(); // Watermarks haven't advanced - assertThat(initialTimers.entrySet(), emptyIterable()); + assertThat(initialTimers, emptyIterable()); // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); @@ -1273,16 +1249,11 @@ public class WatermarkManagerTest implements Serializable { new Instant(1000L)); manager.refreshAll(); - Map, Map, FiredTimers>> firstTransformFiredTimers = + Collection firstFiredTimers = manager.extractFiredTimers(); - assertThat( - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map, FiredTimers> firstFilteredTimers = - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(firstFilteredTimers.get(key), not(nullValue())); - FiredTimers firstFired = firstFilteredTimers.get(key); - assertThat( - firstFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), contains(earliestTimer)); + assertThat(firstFiredTimers, not(Matchers.emptyIterable())); + FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers); + assertThat(firstFired.getTimers(), contains(earliestTimer)); clock.set(new Instant(50_000L)); manager.updateWatermarks(null, @@ -1292,18 +1263,11 @@ public class WatermarkManagerTest implements Serializable { Collections.>emptyList()), new Instant(50_000L)); manager.refreshAll(); - Map, Map, FiredTimers>> secondTransformFiredTimers = - manager.extractFiredTimers(); - assertThat( - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map, FiredTimers> secondFilteredTimers = - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(secondFilteredTimers.get(key), not(nullValue())); - FiredTimers secondFired = secondFilteredTimers.get(key); + Collection secondFiredTimers = manager.extractFiredTimers(); + assertThat(secondFiredTimers, not(Matchers.emptyIterable())); + FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers); // Contains, in order, middleTimer and then lastTimer - assertThat( - secondFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), - contains(middleTimer, lastTimer)); + assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer)); } @Test