kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: improve MinTimestampTrackerTest and fix NPE when null element
Date Tue, 04 Apr 2017 23:27:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 444110eac -> 5a57913d5


MINOR: improve MinTimestampTrackerTest and fix NPE when null element

Cherry picked from trunk fix https://github.com/apache/kafka/pull/2611

Author: Eno Thereska <eno@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2803 from enothereska/fix_NPE_0.10.2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a57913d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a57913d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a57913d

Branch: refs/heads/0.10.2
Commit: 5a57913d57d8b9dc061688f787a5bef9be626361
Parents: 444110e
Author: Eno Thereska <eno@confluent.io>
Authored: Tue Apr 4 16:27:48 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Apr 4 16:27:48 2017 -0700

----------------------------------------------------------------------
 .../internals/MinTimestampTracker.java          |  20 ++--
 .../internals/MinTimestampTrackerTest.java      | 107 ++++++++-----------
 2 files changed, 59 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5a57913d/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
index 655b8b8..bfe98ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
@@ -1,10 +1,10 @@
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals;
 
 import java.util.LinkedList;
@@ -35,7 +34,7 @@ public class MinTimestampTracker<E> implements TimestampTracker<E>
{
     /**
      * @throws NullPointerException if the element is null
      */
-    public void addElement(Stamped<E> elem) {
+    public void addElement(final Stamped<E> elem) {
         if (elem == null) throw new NullPointerException();
 
         Stamped<E> minElem = descendingSubsequence.peekLast();
@@ -46,12 +45,19 @@ public class MinTimestampTracker<E> implements TimestampTracker<E>
{
         descendingSubsequence.offerLast(elem);
     }
 
-    public void removeElement(Stamped<E> elem) {
-        if (elem != null && descendingSubsequence.peekFirst() == elem)
+    public void removeElement(final Stamped<E> elem) {
+        if (elem == null) {
+            return;
+        }
+
+        if (descendingSubsequence.peekFirst() == elem) {
             descendingSubsequence.removeFirst();
+        }
 
-        if (descendingSubsequence.isEmpty())
+        if (descendingSubsequence.isEmpty()) {
             lastKnownTime = elem.timestamp;
+        }
+
     }
 
     public int size() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a57913d/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
index 0a1f95c..5243519 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
@@ -1,10 +1,10 @@
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -14,80 +14,65 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals;
 
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 import org.junit.Test;
 
 public class MinTimestampTrackerTest {
 
-    private Stamped<String> elem(long timestamp) {
-        return new Stamped<>("", timestamp);
-    }
+    private MinTimestampTracker<String> tracker = new MinTimestampTracker<>();
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testTracking() {
-        TimestampTracker<String> tracker = new MinTimestampTracker<>();
-
-        Object[] elems = new Object[]{
-            elem(100), elem(101), elem(102), elem(98), elem(99), elem(100)
-        };
-
-        int insertionIndex = 0;
-        int removalIndex = 0;
-
-        // add 100
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(100L, tracker.get());
-
-        // add 101
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(100L, tracker.get());
-
-        // remove 100
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(101L, tracker.get());
-
-        // add 102
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(101L, tracker.get());
-
-        // add 98
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(98L, tracker.get());
-
-        // add 99
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(98L, tracker.get());
-
-        // add 100
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(98L, tracker.get());
+    public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() throws Exception {
+        assertThat(tracker.get(), equalTo(TimestampTracker.NOT_KNOWN));
+    }
 
-        // remove 101
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(98L, tracker.get());
+    @Test
+    public void shouldReturnTimestampOfOnlyRecord() throws Exception {
+        tracker.addElement(elem(100));
+        assertThat(tracker.get(), equalTo(100L));
+    }
 
-        // remove 102
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(98L, tracker.get());
+    @Test
+    public void shouldReturnLowestAvailableTimestampFromAllInputs() throws Exception {
+        tracker.addElement(elem(100));
+        tracker.addElement(elem(99));
+        tracker.addElement(elem(102));
+        assertThat(tracker.get(), equalTo(99L));
+    }
 
-        // remove 98
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(99L, tracker.get());
+    @Test
+    public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() throws Exception
{
+        final Stamped<String> lowest = elem(88);
+        tracker.addElement(lowest);
+        tracker.addElement(elem(101));
+        tracker.addElement(elem(99));
+        tracker.removeElement(lowest);
+        assertThat(tracker.get(), equalTo(99L));
+    }
 
-        // remove 99
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(100L, tracker.get());
+    @Test
+    public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() throws Exception
{
+        final Stamped<String> record = elem(98);
+        tracker.addElement(record);
+        tracker.removeElement(record);
+        assertThat(tracker.get(), equalTo(98L));
+    }
 
-        // remove 100
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(100L, tracker.get());
+    @Test
+    public void shouldIgnoreNullRecordOnRemove() throws Exception {
+        tracker.removeElement(null);
+    }
 
-        assertEquals(insertionIndex, removalIndex);
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() throws Exception
{
+        tracker.addElement(null);
     }
 
+    private Stamped<String> elem(final long timestamp) {
+        return new Stamped<>("", timestamp);
+    }
 }
\ No newline at end of file


Mime
View raw message