kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered immediately (#4301)
Date Wed, 31 Jan 2018 04:09:18 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 2cf0829  KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered immediately (#4301)
2cf0829 is described below

commit 2cf0829c7c21eaf1c844f4deddbbdd8b1411341a
Author: fredfp <fredericarno@gmail.com>
AuthorDate: Wed Jan 31 10:18:55 2018 +0800

    KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered immediately (#4301)
    
    This PR avoids unnecessary punctuation calls if punctuations are missed due to large time
advances. It also aligns punctuation schedules to the epoch.
    
    Author: Frederic Arno
    
    Reviewers: Michal Borowiecki <michal.borowiecki@openbet.com>, Guozhang Wang <wangguoz@gmail.com>,
Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>
---
 .../kafka/streams/processor/ProcessorContext.java  |  15 +-
 .../processor/internals/PunctuationSchedule.java   |  34 +++--
 .../streams/processor/internals/StreamTask.java    |  26 +++-
 .../processor/internals/PunctuationQueueTest.java  |  60 +++++++-
 .../processor/internals/StreamTaskTest.java        | 165 +++++++++++++++++++--
 5 files changed, 269 insertions(+), 31 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 385d641..f9969d4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -102,11 +102,22 @@ public interface ProcessorContext {
      * <ul>
      *   <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which is advanced
by the processing of messages
      *   in accordance with the timestamp as extracted by the {@link TimestampExtractor}
in use.
+     *   The first punctuation will be triggered by the first record that is processed.
      *   <b>NOTE:</b> Only advanced if messages arrive</li>
      *   <li>{@link PunctuationType#WALL_CLOCK_TIME} - uses system time (the wall-clock
time),
      *   which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
-     *   independent of whether new messages arrive. <b>NOTE:</b> This is best
effort only as its granularity is limited
-     *   by how long an iteration of the processing loop takes to complete</li>
+     *   independent of whether new messages arrive.
+     *   The first punctuation will be triggered after interval has elapsed.
+     *   <b>NOTE:</b> This is best effort only as its granularity is limited
by how long an iteration of the
+     *   processing loop takes to complete</li>
+     * </ul>
+     *
+     * <b>Skipping punctuations:</b> Punctuations will not be triggered more
than once at any given timestamp.
+     * This means that "missed" punctuation will be skipped.
+     * It's possible to "miss" a punctuation if:
+     * <ul>
+     *   <li>with {@link PunctuationType#STREAM_TIME}, when stream time advances more
than interval</li>
+     *   <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too short interval,
...</li>
      * </ul>
      *
      * @param interval the time interval between punctuations
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index cf50005..9c0ec88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -27,12 +27,19 @@ public class PunctuationSchedule extends Stamped<ProcessorNode>
{
     // this Cancellable will be re-pointed at the successor schedule in next()
     private final RepointableCancellable cancellable;
 
-    PunctuationSchedule(ProcessorNode node, long interval, Punctuator punctuator) {
-        this(node, 0L, interval, punctuator, new RepointableCancellable());
+    PunctuationSchedule(final ProcessorNode node,
+                        final long time,
+                        final long interval,
+                        final Punctuator punctuator) {
+        this(node, time, interval, punctuator, new RepointableCancellable());
         cancellable.setSchedule(this);
     }
 
-    private PunctuationSchedule(ProcessorNode node, long time, long interval, Punctuator
punctuator, RepointableCancellable cancellable) {
+    private PunctuationSchedule(final ProcessorNode node,
+                                final long time,
+                                final long interval,
+                                final Punctuator punctuator,
+                                final RepointableCancellable cancellable) {
         super(node, time);
         this.interval = interval;
         this.punctuator = punctuator;
@@ -59,14 +66,19 @@ public class PunctuationSchedule extends Stamped<ProcessorNode>
{
         return isCancelled;
     }
 
-    public PunctuationSchedule next(long currTimestamp) {
-        PunctuationSchedule nextSchedule;
-        // we need to special handle the case when it is firstly triggered (i.e. the timestamp
-        // is equal to the interval) by reschedule based on the currTimestamp
-        if (timestamp == 0L)
-            nextSchedule = new PunctuationSchedule(value, currTimestamp + interval, interval,
punctuator, cancellable);
-        else
-            nextSchedule = new PunctuationSchedule(value, timestamp + interval, interval,
punctuator, cancellable);
+    public PunctuationSchedule next(final long currTimestamp) {
+        long nextPunctuationTime = timestamp + interval;
+        if (currTimestamp >= nextPunctuationTime) {
+            // we missed one ore more punctuations
+            // avoid scheduling a new punctuations immediately, this can happen:
+            // - when using STREAM_TIME punctuation and there was a gap i.e., no data was
+            //   received for at least 2*interval
+            // - when using WALL_CLOCK_TIME and there was a gap i.e., punctuation was delayed
for at least 2*interval (GC pause, overload, ...)
+            final long intervalsMissed = (currTimestamp - timestamp) / interval;
+            nextPunctuationTime = timestamp + (intervalsMissed + 1) * interval;
+        }
+
+        final PunctuationSchedule nextSchedule = new PunctuationSchedule(value, nextPunctuationTime,
interval, punctuator, cancellable);
 
         cancellable.setSchedule(nextSchedule);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8766e64..70ece60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -569,16 +569,40 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * @throws IllegalStateException if the current node is not null
      */
     public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator
punctuator) {
+        switch (type) {
+            case STREAM_TIME:
+                // align punctuation to 0L, punctuate as soon as we have data
+                return schedule(0L, interval, type, punctuator);
+            case WALL_CLOCK_TIME:
+                // align punctuation to now, punctuate after interval has elapsed
+                return schedule(time.milliseconds() + interval, interval, type, punctuator);
+            default:
+                throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
+        }
+    }
+
+    /**
+     * Schedules a punctuation for the processor
+     *
+     * @param startTime time of the first punctuation
+     * @param interval the interval in milliseconds
+     * @param type the punctuation type
+     * @throws IllegalStateException if the current node is not null
+     */
+    Cancellable schedule(final long startTime, final long interval, final PunctuationType
type, final Punctuator punctuator) {
         if (processorContext.currentNode() == null) {
             throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix));
         }
 
-        final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(),
interval, punctuator);
+        final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(),
startTime, interval, punctuator);
 
         switch (type) {
             case STREAM_TIME:
+                // STREAM_TIME punctuation is data driven, will first punctuate as soon as
stream-time is known and >= time,
+                // stream-time is known when we have received at least one record from each
input topic
                 return streamTimePunctuationQueue.schedule(schedule);
             case WALL_CLOCK_TIME:
+                // WALL_CLOCK_TIME is driven by the wall clock time, will first punctuate
when now >= time
                 return systemTimePunctuationQueue.schedule(schedule);
             default:
                 throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 1570c9b..09c7a0a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -40,7 +40,7 @@ public class PunctuationQueueTest {
             }
         };
 
-        final PunctuationSchedule sched = new PunctuationSchedule(node, 100L, punctuator);
+        final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
         final long now = sched.timestamp - 100L;
 
         queue.schedule(sched);
@@ -66,6 +66,64 @@ public class PunctuationQueueTest {
 
         queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
         assertEquals(2, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(4, processor.punctuatedAt.size());
+    }
+
+    @Test
+    public void testPunctuationIntervalCustomAlignment() {
+        final TestProcessor processor = new TestProcessor();
+        final ProcessorNode<String, String> node = new ProcessorNode<>("test",
processor, null);
+        final PunctuationQueue queue = new PunctuationQueue();
+        final Punctuator punctuator = new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                node.processor().punctuate(timestamp);
+            }
+        };
+
+        final PunctuationSchedule sched = new PunctuationSchedule(node, 50L, 100L, punctuator);
+        final long now = sched.timestamp - 50L;
+
+        queue.schedule(sched);
+
+        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
+            @Override
+            public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator
punctuator) {
+                punctuator.punctuate(time);
+            }
+        };
+
+        queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(0, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(0, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(2, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(3, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(4, processor.punctuatedAt.size());
     }
 
     private static class TestProcessor extends AbstractProcessor<String, String> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 4bbd0d6..2b3865a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -309,63 +309,169 @@ public class StreamTaskTest {
     @Test
     public void testMaybePunctuateStreamTime() {
         task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 32,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 60,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
         ));
 
         task.addRecords(partition2, records(
                 new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 61,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
         ));
 
         assertTrue(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(5, task.numBuffered());
+        assertEquals(8, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
+        assertTrue(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(7, task.numBuffered());
+        assertEquals(2, source1.numReceived);
+        assertEquals(0, source2.numReceived);
+
         assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(4, task.numBuffered());
-        assertEquals(1, source1.numReceived);
+        assertEquals(6, task.numBuffered());
+        assertEquals(2, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
         assertTrue(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
+        assertEquals(5, task.numBuffered());
+        assertEquals(3, source1.numReceived);
+        assertEquals(1, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(4, task.numBuffered());
+        assertEquals(3, source1.numReceived);
+        assertEquals(2, source2.numReceived);
+
+        assertTrue(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
         assertEquals(3, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(2, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(2, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(3, source2.numReceived);
+
+        assertTrue(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(1, task.numBuffered());
+        assertEquals(5, source1.numReceived);
+        assertEquals(3, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(0, task.numBuffered());
+        assertEquals(5, source1.numReceived);
+        assertEquals(4, source2.numReceived);
+
+        assertFalse(task.process());
+        assertFalse(task.maybePunctuateStreamTime());
+
+        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
0L, 20L, 32L, 40L, 60L);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPunctuateOnceStreamTimeAfterGap() {
+        task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 142,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 155,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 160,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
+
+        task.addRecords(partition2, records(
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 145,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 159,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 161,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
+
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20
+
+        assertTrue(task.process());
+        assertEquals(7, task.numBuffered());
+        assertEquals(1, source1.numReceived);
+        assertEquals(0, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(6, task.numBuffered());
+        assertEquals(1, source1.numReceived);
+        assertEquals(1, source2.numReceived);
+
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142
+
+        // only one punctuation after 100ms gap
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(5, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
         assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(2, task.numBuffered());
+        assertEquals(4, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
-        assertTrue(task.maybePunctuateStreamTime());
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155
 
         assertTrue(task.process());
-        assertEquals(1, task.numBuffered());
+        assertEquals(3, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
         assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
-        assertEquals(0, task.numBuffered());
+        assertEquals(2, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(3, source2.numReceived);
 
+        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160, still aligned on
the initial punctuation
+
+        assertTrue(task.process());
+        assertEquals(1, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(3, source2.numReceived);
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+        assertEquals(0, task.numBuffered());
+        assertEquals(4, source1.numReceived);
+        assertEquals(4, source2.numReceived);
+
         assertFalse(task.process());
         assertFalse(task.maybePunctuateStreamTime());
 
-        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
20L, 30L, 40L);
+        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
20L, 142L, 155L, 160L);
     }
 
     @SuppressWarnings("unchecked")
@@ -405,18 +511,45 @@ public class StreamTaskTest {
         assertTrue(task.maybePunctuateSystemTime());
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
-        time.sleep(10);
+        time.sleep(9);
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(1);
         assertTrue(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 10, now + 20, now + 30);
+        time.sleep(20);
+        assertTrue(task.maybePunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 10, now + 20, now + 30, now + 50);
     }
 
     @Test
     public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
-        long now = time.milliseconds();
-        assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
+        assertFalse(task.maybePunctuateSystemTime());
         time.sleep(9);
         assertFalse(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now);
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME);
+    }
+
+    @Test
+    public void shouldPunctuateOnceSystemTimeAfterGap() {
+        long now = time.milliseconds();
+        time.sleep(100);
+        assertTrue(task.maybePunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(12);
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(7);
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(1); // punctuate at now + 130
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(105); // punctuate at now + 235
+        assertTrue(task.maybePunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        time.sleep(5); // punctuate at now + 240, still aligned on the initial punctuation
+        assertTrue(task.maybePunctuateSystemTime());
+        assertFalse(task.maybePunctuateSystemTime());
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message