kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-3514: Remove min timestamp tracker (#5382)
Date Thu, 19 Jul 2018 15:06:22 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2f6240a  KAFKA-3514: Remove min timestamp tracker (#5382)
2f6240a is described below

commit 2f6240ac944f2a55ece50a179b79f4e2ee63a621
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Jul 19 08:06:17 2018 -0700

    KAFKA-3514: Remove min timestamp tracker (#5382)
    
    1. Remove MinTimestampTracker and its TimestampTracker interface.
    2. In RecordQueue, keep track of the head record (deserialized) while put the rest raw
bytes records in the fifo queue, the head record as well as the partition timestamp will be
updated accordingly.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../processor/internals/MinTimestampTracker.java   |  84 -------------
 .../processor/internals/PartitionGroup.java        |   2 +-
 .../streams/processor/internals/RecordQueue.java   | 136 +++++++++------------
 .../processor/internals/StandbyContextImpl.java    |   3 +-
 .../streams/processor/internals/StreamTask.java    |   2 +-
 .../processor/internals/TimestampTracker.java      |  61 ---------
 .../internals/MinTimestampTrackerTest.java         |  78 ------------
 .../processor/internals/PartitionGroupTest.java    |  46 +++----
 .../processor/internals/RecordQueueTest.java       |  26 ++--
 9 files changed, 96 insertions(+), 342 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
deleted file mode 100644
index df35c3d..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import java.util.LinkedList;
-
-/**
- * MinTimestampTracker implements {@link TimestampTracker} that maintains the min
- * timestamp of the maintained stamped elements.
- */
-public class MinTimestampTracker<E> implements TimestampTracker<E> {
-
-    // first element has the lowest timestamp and last element the highest
-    private final LinkedList<Stamped<E>> ascendingSubsequence = new LinkedList<>();
-
-    // in the case that incoming traffic is very small, the records maybe put and polled
-    // within a single iteration, in this case we need to remember the last polled
-    // record's timestamp
-    private long lastKnownTime = NOT_KNOWN;
-
-    /**
-     * @throws NullPointerException if the element is null
-     */
-    public void addElement(final Stamped<E> elem) {
-        if (elem == null) throw new NullPointerException();
-
-        Stamped<E> maxElem = ascendingSubsequence.peekLast();
-        while (maxElem != null && maxElem.timestamp >= elem.timestamp) {
-            ascendingSubsequence.removeLast();
-            maxElem = ascendingSubsequence.peekLast();
-        }
-        ascendingSubsequence.offerLast(elem); //lower timestamps have been retained and all
greater/equal removed
-    }
-
-    public void removeElement(final Stamped<E> elem) {
-        if (elem == null) {
-            return;
-        }
-
-        if (ascendingSubsequence.peekFirst() == elem) {
-            ascendingSubsequence.removeFirst();
-        }
-
-        if (ascendingSubsequence.isEmpty()) {
-            lastKnownTime = elem.timestamp;
-        }
-
-    }
-
-    public int size() {
-        return ascendingSubsequence.size();
-    }
-
-    /**
-     * @return the lowest tracked timestamp
-     */
-    public long get() {
-        Stamped<E> stamped = ascendingSubsequence.peekFirst();
-
-        if (stamped == null)
-            return lastKnownTime;
-        else
-            return stamped.timestamp;
-    }
-
-    public void clear() {
-        lastKnownTime = NOT_KNOWN;
-        ascendingSubsequence.clear();
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index c809da9..34252bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -58,7 +58,7 @@ public class PartitionGroup {
         nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp));
         this.partitionQueues = partitionQueues;
         totalBuffered = 0;
-        streamTime = -1;
+        streamTime = RecordQueue.NOT_KNOWN;
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 22ef4d6..86340bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -35,16 +35,19 @@ import java.util.ArrayDeque;
  * timestamp is monotonically increasing such that once it is advanced, it will not be decremented.
  */
 public class RecordQueue {
+
+    static final long NOT_KNOWN = -1L;
+
+    private final Logger log;
     private final SourceNode source;
-    private final TimestampExtractor timestampExtractor;
     private final TopicPartition partition;
-    private final ArrayDeque<StampedRecord> fifoQueue;
-    private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
-    private final RecordDeserializer recordDeserializer;
     private final ProcessorContext processorContext;
-    private final Logger log;
+    private final TimestampExtractor timestampExtractor;
+    private final RecordDeserializer recordDeserializer;
+    private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
 
-    private long partitionTime = TimestampTracker.NOT_KNOWN;
+    private long partitionTime = NOT_KNOWN;
+    private StampedRecord headRecord = null;
 
     RecordQueue(final TopicPartition partition,
                 final SourceNode source,
@@ -52,11 +55,10 @@ public class RecordQueue {
                 final DeserializationExceptionHandler deserializationExceptionHandler,
                 final InternalProcessorContext processorContext,
                 final LogContext logContext) {
-        this.partition = partition;
         this.source = source;
-        this.timestampExtractor = timestampExtractor;
+        this.partition = partition;
         this.fifoQueue = new ArrayDeque<>();
-        this.timeTracker = new MinTimestampTracker<>();
+        this.timestampExtractor = timestampExtractor;
         this.recordDeserializer = new RecordDeserializer(
             source,
             deserializationExceptionHandler,
@@ -93,48 +95,10 @@ public class RecordQueue {
      */
     int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords)
{
         for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
-
-            final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext,
rawRecord);
-            if (record == null) {
-                // this only happens if the deserializer decides to skip. It has already
logged the reason.
-                continue;
-            }
-
-            final long timestamp;
-            try {
-                timestamp = timestampExtractor.extract(record, timeTracker.get());
-            } catch (final StreamsException internalFatalExtractorException) {
-                throw internalFatalExtractorException;
-            } catch (final Exception fatalUserException) {
-                throw new StreamsException(
-                    String.format("Fatal user code error in TimestampExtractor callback for
record %s.", record),
-                    fatalUserException);
-            }
-            log.trace("Source node {} extracted timestamp {} for record {}", source.name(),
timestamp, record);
-
-            // drop message if TS is invalid, i.e., negative
-            if (timestamp < 0) {
-                log.warn(
-                    "Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}]
offset=[{}] extractedTimestamp=[{}] extractor=[{}]",
-                    record.topic(), record.partition(), record.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
-                );
-                ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record();
-                continue;
-            }
-
-            final StampedRecord stampedRecord = new StampedRecord(record, timestamp);
-            fifoQueue.addLast(stampedRecord);
-            timeTracker.addElement(stampedRecord);
+            fifoQueue.addLast(rawRecord);
         }
 
-        // update the partition timestamp if its currently
-        // tracked min timestamp has exceed its value; this will
-        // usually only take effect for the first added batch
-        final long timestamp = timeTracker.get();
-
-        if (timestamp > partitionTime) {
-            partitionTime = timestamp;
-        }
+        maybeUpdateTimestamp();
 
         return size();
     }
@@ -145,23 +109,12 @@ public class RecordQueue {
      * @return StampedRecord
      */
     public StampedRecord poll() {
-        final StampedRecord elem = fifoQueue.pollFirst();
-
-        if (elem == null) {
-            return null;
-        }
-
-        timeTracker.removeElement(elem);
+        final StampedRecord recordToReturn = headRecord;
+        headRecord = null;
 
-        // only advance the partition timestamp if its currently
-        // tracked min timestamp has exceeded its value
-        final long timestamp = timeTracker.get();
+        maybeUpdateTimestamp();
 
-        if (timestamp > partitionTime) {
-            partitionTime = timestamp;
-        }
-
-        return elem;
+        return recordToReturn;
     }
 
     /**
@@ -170,7 +123,8 @@ public class RecordQueue {
      * @return the number of records
      */
     public int size() {
-        return fifoQueue.size();
+        // plus one deserialized head record for timestamp tracking
+        return fifoQueue.size() + (headRecord == null ? 0 : 1);
     }
 
     /**
@@ -179,7 +133,7 @@ public class RecordQueue {
      * @return true if the queue is empty, otherwise false
      */
     public boolean isEmpty() {
-        return fifoQueue.isEmpty();
+        return fifoQueue.isEmpty() && headRecord == null;
     }
 
     /**
@@ -196,16 +150,48 @@ public class RecordQueue {
      */
     public void clear() {
         fifoQueue.clear();
-        timeTracker.clear();
-        partitionTime = TimestampTracker.NOT_KNOWN;
+        headRecord = null;
+        partitionTime = NOT_KNOWN;
     }
 
-    /*
-     * Returns the timestamp tracker of the record queue
-     *
-     * This is only used for testing
-     */
-    TimestampTracker<ConsumerRecord<Object, Object>> timeTracker() {
-        return timeTracker;
+    private void maybeUpdateTimestamp() {
+        while (headRecord == null && !fifoQueue.isEmpty()) {
+            final ConsumerRecord<byte[], byte[]> raw = fifoQueue.pollFirst();
+            final ConsumerRecord<Object, Object> deserialized = recordDeserializer.deserialize(processorContext,
raw);
+
+            if (deserialized == null) {
+                // this only happens if the deserializer decides to skip. It has already
logged the reason.
+                continue;
+            }
+
+            final long timestamp;
+            try {
+                timestamp = timestampExtractor.extract(deserialized, partitionTime);
+            } catch (final StreamsException internalFatalExtractorException) {
+                throw internalFatalExtractorException;
+            } catch (final Exception fatalUserException) {
+                throw new StreamsException(
+                        String.format("Fatal user code error in TimestampExtractor callback
for record %s.", deserialized),
+                        fatalUserException);
+            }
+            log.trace("Source node {} extracted timestamp {} for record {}", source.name(),
timestamp, deserialized);
+
+            // drop message if TS is invalid, i.e., negative
+            if (timestamp < 0) {
+                log.warn(
+                        "Skipping record due to negative extracted timestamp. topic=[{}]
partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]",
+                        deserialized.topic(), deserialized.partition(), deserialized.offset(),
timestamp, timestampExtractor.getClass().getCanonicalName()
+                );
+                ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record();
+                continue;
+            }
+
+            headRecord = new StampedRecord(deserialized, timestamp);
+
+            // update the partition timestamp if the current head record's timestamp has
exceed its value
+            if (timestamp > partitionTime) {
+                partitionTime = timestamp;
+            }
+        }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index b01fd5b..4c06c39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -69,7 +69,8 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
             return Collections.emptyMap();
         }
     };
-    private long streamTime = TimestampTracker.NOT_KNOWN;
+
+    private long streamTime = RecordQueue.NOT_KNOWN;
 
     StandbyContextImpl(final TaskId id,
                        final StreamsConfig config,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6af4c4b..7f121fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -716,7 +716,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
         // if the timestamp is not known yet, meaning there is not enough data accumulated
         // to reason stream partition time, then skip.
-        if (timestamp == TimestampTracker.NOT_KNOWN) {
+        if (timestamp == RecordQueue.NOT_KNOWN) {
             return false;
         } else {
             return streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME,
this);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
deleted file mode 100644
index 30c816d..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-/**
- * TimestampTracker is a helper class for a sliding window implementation.
- * It is assumed that stamped elements are added or removed in a FIFO manner.
- * It maintains the timestamp, such as the min timestamp, the max timestamp, etc.
- * of stamped elements that were added but not yet removed.
- */
-public interface TimestampTracker<E> {
-
-    long NOT_KNOWN = -1L;
-
-    /**
-     * Adds a stamped elements to this tracker.
-     *
-     * @param elem the added element
-     */
-    void addElement(Stamped<E> elem);
-
-    /**
-     * Removed a stamped elements to this tracker.
-     *
-     * @param elem the removed element
-     */
-    void removeElement(Stamped<E> elem);
-
-    /**
-     * Returns the current tracked timestamp
-     *
-     * @return timestamp, or {@link #NOT_KNOWN} when empty
-     */
-    long get();
-
-    /**
-     * Returns the size of internal structure. The meaning of "size" depends on the implementation.
-     *
-     * @return size
-     */
-    int size();
-
-    /**
-     * Empty the tracker by removing any tracked stamped elements
-     */
-    void clear();
-}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
deleted file mode 100644
index 24653e6..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import org.junit.Test;
-
-public class MinTimestampTrackerTest {
-
-    private MinTimestampTracker<String> tracker = new MinTimestampTracker<>();
-
-    @Test
-    public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() {
-        assertThat(tracker.get(), equalTo(TimestampTracker.NOT_KNOWN));
-    }
-
-    @Test
-    public void shouldReturnTimestampOfOnlyRecord() {
-        tracker.addElement(elem(100));
-        assertThat(tracker.get(), equalTo(100L));
-    }
-
-    @Test
-    public void shouldReturnLowestAvailableTimestampFromAllInputs() {
-        tracker.addElement(elem(100));
-        tracker.addElement(elem(99));
-        tracker.addElement(elem(102));
-        assertThat(tracker.get(), equalTo(99L));
-    }
-
-    @Test
-    public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() {
-        final Stamped<String> lowest = elem(88);
-        tracker.addElement(lowest);
-        tracker.addElement(elem(101));
-        tracker.addElement(elem(99));
-        tracker.removeElement(lowest);
-        assertThat(tracker.get(), equalTo(99L));
-    }
-
-    @Test
-    public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() {
-        final Stamped<String> record = elem(98);
-        tracker.addElement(record);
-        tracker.removeElement(record);
-        assertThat(tracker.get(), equalTo(98L));
-    }
-
-    @Test
-    public void shouldIgnoreNullRecordOnRemove() {
-        tracker.removeElement(null);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() {
-        tracker.addElement(null);
-    }
-
-    private Stamped<String> elem(final long timestamp) {
-        return new Stamped<>("", timestamp);
-    }
-}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 9823ae1..b3123e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -140,49 +140,49 @@ public class PartitionGroupTest {
         record = group.nextRecord(info);
         // 1:[5, 2, 4]
         // 2:[4, 6]
-        // st: 3 (2's presence prevents it from advancing to 4)
+        // st: 4 as partition st is now {5, 4}
         assertEquals(partition1, info.partition());
         assertEquals(3L, record.timestamp);
         assertEquals(5, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
+        assertEquals(4L, group.timestamp());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
-        // 1:[2, 4]
-        // 2:[4, 6]
-        // st: 3 (2's presence prevents it from advancing to 4)
-        assertEquals(partition1, info.partition());
-        assertEquals(5L, record.timestamp);
+        // 1:[5, 2, 4]
+        // 2:[6]
+        // st: 5 as partition st is now {5, 6}
+        assertEquals(partition2, info.partition());
+        assertEquals(4L, record.timestamp);
         assertEquals(4, group.numBuffered());
-        assertEquals(2, group.numBuffered(partition1));
-        assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
+        assertEquals(3, group.numBuffered(partition1));
+        assertEquals(1, group.numBuffered(partition2));
+        assertEquals(5L, group.timestamp());
 
         // get one more record, now time should be advanced
         record = group.nextRecord(info);
-        // 1:[4]
-        // 2:[4, 6]
-        // st: 4
+        // 1:[2, 4]
+        // 2:[6]
+        // st: 5
         assertEquals(partition1, info.partition());
-        assertEquals(2L, record.timestamp);
+        assertEquals(5L, record.timestamp);
         assertEquals(3, group.numBuffered());
-        assertEquals(1, group.numBuffered(partition1));
-        assertEquals(2, group.numBuffered(partition2));
-        assertEquals(4L, group.timestamp());
+        assertEquals(2, group.numBuffered(partition1));
+        assertEquals(1, group.numBuffered(partition2));
+        assertEquals(5L, group.timestamp());
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
         // 1:[4]
         // 2:[6]
-        // st: 4
-        assertEquals(partition2, info.partition());
-        assertEquals(4L, record.timestamp);
+        // st: 5
+        assertEquals(partition1, info.partition());
+        assertEquals(2L, record.timestamp);
         assertEquals(2, group.numBuffered());
         assertEquals(1, group.numBuffered(partition1));
         assertEquals(1, group.numBuffered(partition2));
-        assertEquals(4L, group.timestamp());
+        assertEquals(5L, group.timestamp());
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
@@ -194,7 +194,7 @@ public class PartitionGroupTest {
         assertEquals(1, group.numBuffered());
         assertEquals(0, group.numBuffered(partition1));
         assertEquals(1, group.numBuffered(partition2));
-        assertEquals(4L, group.timestamp());
+        assertEquals(5L, group.timestamp());
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
@@ -206,7 +206,7 @@ public class PartitionGroupTest {
         assertEquals(0, group.numBuffered());
         assertEquals(0, group.numBuffered(partition1));
         assertEquals(0, group.numBuffered(partition2));
-        assertEquals(4L, group.timestamp());
+        assertEquals(5L, group.timestamp());
 
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 3ed9e3b..cf1d63f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -102,7 +102,7 @@ public class RecordQueueTest {
 
         assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
-        assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
+        assertEquals(RecordQueue.NOT_KNOWN, queue.timestamp());
 
         // add three 3 out-of-order records with timestamp 2, 1, 3
         final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
@@ -113,20 +113,17 @@ public class RecordQueueTest {
         queue.addRawRecords(list1);
 
         assertEquals(3, queue.size());
-        assertEquals(1L, queue.timestamp());
-        assertEquals(2, queue.timeTracker().size());
+        assertEquals(2L, queue.timestamp());
 
         // poll the first record, now with 1, 3
         assertEquals(2L, queue.poll().timestamp);
         assertEquals(2, queue.size());
-        assertEquals(1L, queue.timestamp());
-        assertEquals(2, queue.timeTracker().size());
+        assertEquals(2L, queue.timestamp());
 
         // poll the second record, now with 3
         assertEquals(1L, queue.poll().timestamp);
         assertEquals(1, queue.size());
         assertEquals(3L, queue.timestamp());
-        assertEquals(1, queue.timeTracker().size());
 
         // add three 3 out-of-order records with timestamp 4, 1, 2
         // now with 3, 4, 1, 2
@@ -139,28 +136,23 @@ public class RecordQueueTest {
 
         assertEquals(4, queue.size());
         assertEquals(3L, queue.timestamp());
-        assertEquals(2, queue.timeTracker().size());
 
         // poll the third record, now with 4, 1, 2
         assertEquals(3L, queue.poll().timestamp);
         assertEquals(3, queue.size());
-        assertEquals(3L, queue.timestamp());
-        assertEquals(2, queue.timeTracker().size());
+        assertEquals(4L, queue.timestamp());
 
         // poll the rest records
         assertEquals(4L, queue.poll().timestamp);
-        assertEquals(3L, queue.timestamp());
-        assertEquals(2, queue.timeTracker().size());
+        assertEquals(4L, queue.timestamp());
 
         assertEquals(1L, queue.poll().timestamp);
-        assertEquals(3L, queue.timestamp());
-        assertEquals(1, queue.timeTracker().size());
+        assertEquals(4L, queue.timestamp());
 
         assertEquals(2L, queue.poll().timestamp);
         assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
-        assertEquals(3L, queue.timestamp());
-        assertEquals(0, queue.timeTracker().size());
+        assertEquals(4L, queue.timestamp());
 
         // add three more records with 4, 5, 6
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -177,14 +169,12 @@ public class RecordQueueTest {
         assertEquals(4L, queue.poll().timestamp);
         assertEquals(2, queue.size());
         assertEquals(5L, queue.timestamp());
-        assertEquals(2, queue.timeTracker().size());
 
         // clear the queue
         queue.clear();
         assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
-        assertEquals(0, queue.timeTracker().size());
-        assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
+        assertEquals(RecordQueue.NOT_KNOWN, queue.timestamp());
 
         // re-insert the three records with 4, 5, 6
         queue.addRawRecords(list3);


Mime
View raw message