From commits-return-10013-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Thu Jul 19 17:06:26 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id E7EA3180630 for ; Thu, 19 Jul 2018 17:06:24 +0200 (CEST) Received: (qmail 19712 invoked by uid 500); 19 Jul 2018 15:06:23 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 19703 invoked by uid 99); 19 Jul 2018 15:06:23 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jul 2018 15:06:23 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 4258F820C4; Thu, 19 Jul 2018 15:06:23 +0000 (UTC) Date: Thu, 19 Jul 2018 15:06:22 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-3514: Remove min timestamp tracker (#5382) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153201278230.11478.10929880711852628123@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 10a0b8a6baad201227281a34ce80748beddd0739 X-Git-Newrev: 2f6240ac944f2a55ece50a179b79f4e2ee63a621 X-Git-Rev: 2f6240ac944f2a55ece50a179b79f4e2ee63a621 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2f6240a KAFKA-3514: Remove min timestamp tracker (#5382) 2f6240a is described below commit 2f6240ac944f2a55ece50a179b79f4e2ee63a621 Author: Guozhang Wang AuthorDate: Thu Jul 19 08:06:17 2018 -0700 KAFKA-3514: Remove min timestamp tracker (#5382) 1. Remove MinTimestampTracker and its TimestampTracker interface. 2. In RecordQueue, keep track of the head record (deserialized) while put the rest raw bytes records in the fifo queue, the head record as well as the partition timestamp will be updated accordingly. Reviewers: Bill Bejeck , Matthias J. Sax --- .../processor/internals/MinTimestampTracker.java | 84 ------------- .../processor/internals/PartitionGroup.java | 2 +- .../streams/processor/internals/RecordQueue.java | 136 +++++++++------------ .../processor/internals/StandbyContextImpl.java | 3 +- .../streams/processor/internals/StreamTask.java | 2 +- .../processor/internals/TimestampTracker.java | 61 --------- .../internals/MinTimestampTrackerTest.java | 78 ------------ .../processor/internals/PartitionGroupTest.java | 46 +++---- .../processor/internals/RecordQueueTest.java | 26 ++-- 9 files changed, 96 insertions(+), 342 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java deleted file mode 100644 index df35c3d..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -import java.util.LinkedList; - -/** - * MinTimestampTracker implements {@link TimestampTracker} that maintains the min - * timestamp of the maintained stamped elements. - */ -public class MinTimestampTracker implements TimestampTracker { - - // first element has the lowest timestamp and last element the highest - private final LinkedList> ascendingSubsequence = new LinkedList<>(); - - // in the case that incoming traffic is very small, the records maybe put and polled - // within a single iteration, in this case we need to remember the last polled - // record's timestamp - private long lastKnownTime = NOT_KNOWN; - - /** - * @throws NullPointerException if the element is null - */ - public void addElement(final Stamped elem) { - if (elem == null) throw new NullPointerException(); - - Stamped maxElem = ascendingSubsequence.peekLast(); - while (maxElem != null && maxElem.timestamp >= elem.timestamp) { - ascendingSubsequence.removeLast(); - maxElem = ascendingSubsequence.peekLast(); - } - ascendingSubsequence.offerLast(elem); //lower timestamps have been retained and all greater/equal removed - } - - public void removeElement(final Stamped elem) { - if (elem == null) { - return; - } - - if (ascendingSubsequence.peekFirst() == elem) { - ascendingSubsequence.removeFirst(); - } - - if (ascendingSubsequence.isEmpty()) { - lastKnownTime = elem.timestamp; - } - - } - - public int size() { - return ascendingSubsequence.size(); - } - - /** - * @return the lowest tracked timestamp - */ - public long get() { - Stamped stamped = ascendingSubsequence.peekFirst(); - - if (stamped == null) - return lastKnownTime; - else - return stamped.timestamp; - } - - public void clear() { - lastKnownTime = NOT_KNOWN; - ascendingSubsequence.clear(); - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index c809da9..34252bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -58,7 +58,7 @@ public class PartitionGroup { nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp)); this.partitionQueues = partitionQueues; totalBuffered = 0; - streamTime = -1; + streamTime = RecordQueue.NOT_KNOWN; } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 22ef4d6..86340bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -35,16 +35,19 @@ import java.util.ArrayDeque; * timestamp is monotonically increasing such that once it is advanced, it will not be decremented. */ public class RecordQueue { + + static final long NOT_KNOWN = -1L; + + private final Logger log; private final SourceNode source; - private final TimestampExtractor timestampExtractor; private final TopicPartition partition; - private final ArrayDeque fifoQueue; - private final TimestampTracker> timeTracker; - private final RecordDeserializer recordDeserializer; private final ProcessorContext processorContext; - private final Logger log; + private final TimestampExtractor timestampExtractor; + private final RecordDeserializer recordDeserializer; + private final ArrayDeque> fifoQueue; - private long partitionTime = TimestampTracker.NOT_KNOWN; + private long partitionTime = NOT_KNOWN; + private StampedRecord headRecord = null; RecordQueue(final TopicPartition partition, final SourceNode source, @@ -52,11 +55,10 @@ public class RecordQueue { final DeserializationExceptionHandler deserializationExceptionHandler, final InternalProcessorContext processorContext, final LogContext logContext) { - this.partition = partition; this.source = source; - this.timestampExtractor = timestampExtractor; + this.partition = partition; this.fifoQueue = new ArrayDeque<>(); - this.timeTracker = new MinTimestampTracker<>(); + this.timestampExtractor = timestampExtractor; this.recordDeserializer = new RecordDeserializer( source, deserializationExceptionHandler, @@ -93,48 +95,10 @@ public class RecordQueue { */ int addRawRecords(final Iterable> rawRecords) { for (final ConsumerRecord rawRecord : rawRecords) { - - final ConsumerRecord record = recordDeserializer.deserialize(processorContext, rawRecord); - if (record == null) { - // this only happens if the deserializer decides to skip. It has already logged the reason. - continue; - } - - final long timestamp; - try { - timestamp = timestampExtractor.extract(record, timeTracker.get()); - } catch (final StreamsException internalFatalExtractorException) { - throw internalFatalExtractorException; - } catch (final Exception fatalUserException) { - throw new StreamsException( - String.format("Fatal user code error in TimestampExtractor callback for record %s.", record), - fatalUserException); - } - log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record); - - // drop message if TS is invalid, i.e., negative - if (timestamp < 0) { - log.warn( - "Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]", - record.topic(), record.partition(), record.offset(), timestamp, timestampExtractor.getClass().getCanonicalName() - ); - ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record(); - continue; - } - - final StampedRecord stampedRecord = new StampedRecord(record, timestamp); - fifoQueue.addLast(stampedRecord); - timeTracker.addElement(stampedRecord); + fifoQueue.addLast(rawRecord); } - // update the partition timestamp if its currently - // tracked min timestamp has exceed its value; this will - // usually only take effect for the first added batch - final long timestamp = timeTracker.get(); - - if (timestamp > partitionTime) { - partitionTime = timestamp; - } + maybeUpdateTimestamp(); return size(); } @@ -145,23 +109,12 @@ public class RecordQueue { * @return StampedRecord */ public StampedRecord poll() { - final StampedRecord elem = fifoQueue.pollFirst(); - - if (elem == null) { - return null; - } - - timeTracker.removeElement(elem); + final StampedRecord recordToReturn = headRecord; + headRecord = null; - // only advance the partition timestamp if its currently - // tracked min timestamp has exceeded its value - final long timestamp = timeTracker.get(); + maybeUpdateTimestamp(); - if (timestamp > partitionTime) { - partitionTime = timestamp; - } - - return elem; + return recordToReturn; } /** @@ -170,7 +123,8 @@ public class RecordQueue { * @return the number of records */ public int size() { - return fifoQueue.size(); + // plus one deserialized head record for timestamp tracking + return fifoQueue.size() + (headRecord == null ? 0 : 1); } /** @@ -179,7 +133,7 @@ public class RecordQueue { * @return true if the queue is empty, otherwise false */ public boolean isEmpty() { - return fifoQueue.isEmpty(); + return fifoQueue.isEmpty() && headRecord == null; } /** @@ -196,16 +150,48 @@ public class RecordQueue { */ public void clear() { fifoQueue.clear(); - timeTracker.clear(); - partitionTime = TimestampTracker.NOT_KNOWN; + headRecord = null; + partitionTime = NOT_KNOWN; } - /* - * Returns the timestamp tracker of the record queue - * - * This is only used for testing - */ - TimestampTracker> timeTracker() { - return timeTracker; + private void maybeUpdateTimestamp() { + while (headRecord == null && !fifoQueue.isEmpty()) { + final ConsumerRecord raw = fifoQueue.pollFirst(); + final ConsumerRecord deserialized = recordDeserializer.deserialize(processorContext, raw); + + if (deserialized == null) { + // this only happens if the deserializer decides to skip. It has already logged the reason. + continue; + } + + final long timestamp; + try { + timestamp = timestampExtractor.extract(deserialized, partitionTime); + } catch (final StreamsException internalFatalExtractorException) { + throw internalFatalExtractorException; + } catch (final Exception fatalUserException) { + throw new StreamsException( + String.format("Fatal user code error in TimestampExtractor callback for record %s.", deserialized), + fatalUserException); + } + log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, deserialized); + + // drop message if TS is invalid, i.e., negative + if (timestamp < 0) { + log.warn( + "Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]", + deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName() + ); + ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record(); + continue; + } + + headRecord = new StampedRecord(deserialized, timestamp); + + // update the partition timestamp if the current head record's timestamp has exceed its value + if (timestamp > partitionTime) { + partitionTime = timestamp; + } + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index b01fd5b..4c06c39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -69,7 +69,8 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle return Collections.emptyMap(); } }; - private long streamTime = TimestampTracker.NOT_KNOWN; + + private long streamTime = RecordQueue.NOT_KNOWN; StandbyContextImpl(final TaskId id, final StreamsConfig config, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6af4c4b..7f121fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -716,7 +716,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator // if the timestamp is not known yet, meaning there is not enough data accumulated // to reason stream partition time, then skip. - if (timestamp == TimestampTracker.NOT_KNOWN) { + if (timestamp == RecordQueue.NOT_KNOWN) { return false; } else { return streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java deleted file mode 100644 index 30c816d..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -/** - * TimestampTracker is a helper class for a sliding window implementation. - * It is assumed that stamped elements are added or removed in a FIFO manner. - * It maintains the timestamp, such as the min timestamp, the max timestamp, etc. - * of stamped elements that were added but not yet removed. - */ -public interface TimestampTracker { - - long NOT_KNOWN = -1L; - - /** - * Adds a stamped elements to this tracker. - * - * @param elem the added element - */ - void addElement(Stamped elem); - - /** - * Removed a stamped elements to this tracker. - * - * @param elem the removed element - */ - void removeElement(Stamped elem); - - /** - * Returns the current tracked timestamp - * - * @return timestamp, or {@link #NOT_KNOWN} when empty - */ - long get(); - - /** - * Returns the size of internal structure. The meaning of "size" depends on the implementation. - * - * @return size - */ - int size(); - - /** - * Empty the tracker by removing any tracked stamped elements - */ - void clear(); -} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java deleted file mode 100644 index 24653e6..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; - -import org.junit.Test; - -public class MinTimestampTrackerTest { - - private MinTimestampTracker tracker = new MinTimestampTracker<>(); - - @Test - public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() { - assertThat(tracker.get(), equalTo(TimestampTracker.NOT_KNOWN)); - } - - @Test - public void shouldReturnTimestampOfOnlyRecord() { - tracker.addElement(elem(100)); - assertThat(tracker.get(), equalTo(100L)); - } - - @Test - public void shouldReturnLowestAvailableTimestampFromAllInputs() { - tracker.addElement(elem(100)); - tracker.addElement(elem(99)); - tracker.addElement(elem(102)); - assertThat(tracker.get(), equalTo(99L)); - } - - @Test - public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() { - final Stamped lowest = elem(88); - tracker.addElement(lowest); - tracker.addElement(elem(101)); - tracker.addElement(elem(99)); - tracker.removeElement(lowest); - assertThat(tracker.get(), equalTo(99L)); - } - - @Test - public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() { - final Stamped record = elem(98); - tracker.addElement(record); - tracker.removeElement(record); - assertThat(tracker.get(), equalTo(98L)); - } - - @Test - public void shouldIgnoreNullRecordOnRemove() { - tracker.removeElement(null); - } - - @Test(expected = NullPointerException.class) - public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() { - tracker.addElement(null); - } - - private Stamped elem(final long timestamp) { - return new Stamped<>("", timestamp); - } -} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 9823ae1..b3123e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -140,49 +140,49 @@ public class PartitionGroupTest { record = group.nextRecord(info); // 1:[5, 2, 4] // 2:[4, 6] - // st: 3 (2's presence prevents it from advancing to 4) + // st: 4 as partition st is now {5, 4} assertEquals(partition1, info.partition()); assertEquals(3L, record.timestamp); assertEquals(5, group.numBuffered()); assertEquals(3, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition2)); - assertEquals(3L, group.timestamp()); + assertEquals(4L, group.timestamp()); // get one record, time should not be advanced record = group.nextRecord(info); - // 1:[2, 4] - // 2:[4, 6] - // st: 3 (2's presence prevents it from advancing to 4) - assertEquals(partition1, info.partition()); - assertEquals(5L, record.timestamp); + // 1:[5, 2, 4] + // 2:[6] + // st: 5 as partition st is now {5, 6} + assertEquals(partition2, info.partition()); + assertEquals(4L, record.timestamp); assertEquals(4, group.numBuffered()); - assertEquals(2, group.numBuffered(partition1)); - assertEquals(2, group.numBuffered(partition2)); - assertEquals(3L, group.timestamp()); + assertEquals(3, group.numBuffered(partition1)); + assertEquals(1, group.numBuffered(partition2)); + assertEquals(5L, group.timestamp()); // get one more record, now time should be advanced record = group.nextRecord(info); - // 1:[4] - // 2:[4, 6] - // st: 4 + // 1:[2, 4] + // 2:[6] + // st: 5 assertEquals(partition1, info.partition()); - assertEquals(2L, record.timestamp); + assertEquals(5L, record.timestamp); assertEquals(3, group.numBuffered()); - assertEquals(1, group.numBuffered(partition1)); - assertEquals(2, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + assertEquals(2, group.numBuffered(partition1)); + assertEquals(1, group.numBuffered(partition2)); + assertEquals(5L, group.timestamp()); // get one more record, time should not be advanced record = group.nextRecord(info); // 1:[4] // 2:[6] - // st: 4 - assertEquals(partition2, info.partition()); - assertEquals(4L, record.timestamp); + // st: 5 + assertEquals(partition1, info.partition()); + assertEquals(2L, record.timestamp); assertEquals(2, group.numBuffered()); assertEquals(1, group.numBuffered(partition1)); assertEquals(1, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + assertEquals(5L, group.timestamp()); // get one more record, time should not be advanced record = group.nextRecord(info); @@ -194,7 +194,7 @@ public class PartitionGroupTest { assertEquals(1, group.numBuffered()); assertEquals(0, group.numBuffered(partition1)); assertEquals(1, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + assertEquals(5L, group.timestamp()); // get one more record, time should not be advanced record = group.nextRecord(info); @@ -206,7 +206,7 @@ public class PartitionGroupTest { assertEquals(0, group.numBuffered()); assertEquals(0, group.numBuffered(partition1)); assertEquals(0, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + assertEquals(5L, group.timestamp()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 3ed9e3b..cf1d63f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -102,7 +102,7 @@ public class RecordQueueTest { assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp()); + assertEquals(RecordQueue.NOT_KNOWN, queue.timestamp()); // add three 3 out-of-order records with timestamp 2, 1, 3 final List> list1 = Arrays.asList( @@ -113,20 +113,17 @@ public class RecordQueueTest { queue.addRawRecords(list1); assertEquals(3, queue.size()); - assertEquals(1L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); + assertEquals(2L, queue.timestamp()); // poll the first record, now with 1, 3 assertEquals(2L, queue.poll().timestamp); assertEquals(2, queue.size()); - assertEquals(1L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); + assertEquals(2L, queue.timestamp()); // poll the second record, now with 3 assertEquals(1L, queue.poll().timestamp); assertEquals(1, queue.size()); assertEquals(3L, queue.timestamp()); - assertEquals(1, queue.timeTracker().size()); // add three 3 out-of-order records with timestamp 4, 1, 2 // now with 3, 4, 1, 2 @@ -139,28 +136,23 @@ public class RecordQueueTest { assertEquals(4, queue.size()); assertEquals(3L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); // poll the third record, now with 4, 1, 2 assertEquals(3L, queue.poll().timestamp); assertEquals(3, queue.size()); - assertEquals(3L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); + assertEquals(4L, queue.timestamp()); // poll the rest records assertEquals(4L, queue.poll().timestamp); - assertEquals(3L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); + assertEquals(4L, queue.timestamp()); assertEquals(1L, queue.poll().timestamp); - assertEquals(3L, queue.timestamp()); - assertEquals(1, queue.timeTracker().size()); + assertEquals(4L, queue.timestamp()); assertEquals(2L, queue.poll().timestamp); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(3L, queue.timestamp()); - assertEquals(0, queue.timeTracker().size()); + assertEquals(4L, queue.timestamp()); // add three more records with 4, 5, 6 final List> list3 = Arrays.asList( @@ -177,14 +169,12 @@ public class RecordQueueTest { assertEquals(4L, queue.poll().timestamp); assertEquals(2, queue.size()); assertEquals(5L, queue.timestamp()); - assertEquals(2, queue.timeTracker().size()); // clear the queue queue.clear(); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(0, queue.timeTracker().size()); - assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp()); + assertEquals(RecordQueue.NOT_KNOWN, queue.timestamp()); // re-insert the three records with 4, 5, 6 queue.addRawRecords(list3);