Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BF6EC200C5B for ; Wed, 5 Apr 2017 01:27:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BDEC0160BA1; Tue, 4 Apr 2017 23:27:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DEE94160B90 for ; Wed, 5 Apr 2017 01:27:57 +0200 (CEST) Received: (qmail 85603 invoked by uid 500); 4 Apr 2017 23:27:57 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 85593 invoked by uid 99); 4 Apr 2017 23:27:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Apr 2017 23:27:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9DB2DFDCD; Tue, 4 Apr 2017 23:27:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ewencp@apache.org To: commits@kafka.apache.org Message-Id: <0719ed7523c44c7c87bd939fdcc65b0e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: improve MinTimestampTrackerTest and fix NPE when null element Date: Tue, 4 Apr 2017 23:27:56 +0000 (UTC) archived-at: Tue, 04 Apr 2017 23:27:58 -0000 Repository: kafka Updated Branches: refs/heads/0.10.2 444110eac -> 5a57913d5 MINOR: improve MinTimestampTrackerTest and fix NPE when null element Cherry picked from trunk fix https://github.com/apache/kafka/pull/2611 Author: Eno Thereska Reviewers: Ewen Cheslack-Postava Closes #2803 from enothereska/fix_NPE_0.10.2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a57913d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a57913d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a57913d Branch: refs/heads/0.10.2 Commit: 5a57913d57d8b9dc061688f787a5bef9be626361 Parents: 444110e Author: Eno Thereska Authored: Tue Apr 4 16:27:48 2017 -0700 Committer: Ewen Cheslack-Postava Committed: Tue Apr 4 16:27:48 2017 -0700 ---------------------------------------------------------------------- .../internals/MinTimestampTracker.java | 20 ++-- .../internals/MinTimestampTrackerTest.java | 107 ++++++++----------- 2 files changed, 59 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5a57913d/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java index 655b8b8..bfe98ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java @@ -1,10 +1,10 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.processor.internals; import java.util.LinkedList; @@ -35,7 +34,7 @@ public class MinTimestampTracker implements TimestampTracker { /** * @throws NullPointerException if the element is null */ - public void addElement(Stamped elem) { + public void addElement(final Stamped elem) { if (elem == null) throw new NullPointerException(); Stamped minElem = descendingSubsequence.peekLast(); @@ -46,12 +45,19 @@ public class MinTimestampTracker implements TimestampTracker { descendingSubsequence.offerLast(elem); } - public void removeElement(Stamped elem) { - if (elem != null && descendingSubsequence.peekFirst() == elem) + public void removeElement(final Stamped elem) { + if (elem == null) { + return; + } + + if (descendingSubsequence.peekFirst() == elem) { descendingSubsequence.removeFirst(); + } - if (descendingSubsequence.isEmpty()) + if (descendingSubsequence.isEmpty()) { lastKnownTime = elem.timestamp; + } + } public int size() { http://git-wip-us.apache.org/repos/asf/kafka/blob/5a57913d/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java index 0a1f95c..5243519 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java @@ -1,10 +1,10 @@ /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,80 +14,65 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.processor.internals; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import org.junit.Test; public class MinTimestampTrackerTest { - private Stamped elem(long timestamp) { - return new Stamped<>("", timestamp); - } + private MinTimestampTracker tracker = new MinTimestampTracker<>(); - @SuppressWarnings("unchecked") @Test - public void testTracking() { - TimestampTracker tracker = new MinTimestampTracker<>(); - - Object[] elems = new Object[]{ - elem(100), elem(101), elem(102), elem(98), elem(99), elem(100) - }; - - int insertionIndex = 0; - int removalIndex = 0; - - // add 100 - tracker.addElement((Stamped) elems[insertionIndex++]); - assertEquals(100L, tracker.get()); - - // add 101 - tracker.addElement((Stamped) elems[insertionIndex++]); - assertEquals(100L, tracker.get()); - - // remove 100 - tracker.removeElement((Stamped) elems[removalIndex++]); - assertEquals(101L, tracker.get()); - - // add 102 - tracker.addElement((Stamped) elems[insertionIndex++]); - assertEquals(101L, tracker.get()); - - // add 98 - tracker.addElement((Stamped) elems[insertionIndex++]); - assertEquals(98L, tracker.get()); - - // add 99 - tracker.addElement((Stamped) elems[insertionIndex++]); - assertEquals(98L, tracker.get()); - - // add 100 - tracker.addElement((Stamped) elems[insertionIndex++]); - assertEquals(98L, tracker.get()); + public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() throws Exception { + assertThat(tracker.get(), equalTo(TimestampTracker.NOT_KNOWN)); + } - // remove 101 - tracker.removeElement((Stamped) elems[removalIndex++]); - assertEquals(98L, tracker.get()); + @Test + public void shouldReturnTimestampOfOnlyRecord() throws Exception { + tracker.addElement(elem(100)); + assertThat(tracker.get(), equalTo(100L)); + } - // remove 102 - tracker.removeElement((Stamped) elems[removalIndex++]); - assertEquals(98L, tracker.get()); + @Test + public void shouldReturnLowestAvailableTimestampFromAllInputs() throws Exception { + tracker.addElement(elem(100)); + tracker.addElement(elem(99)); + tracker.addElement(elem(102)); + assertThat(tracker.get(), equalTo(99L)); + } - // remove 98 - tracker.removeElement((Stamped) elems[removalIndex++]); - assertEquals(99L, tracker.get()); + @Test + public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() throws Exception { + final Stamped lowest = elem(88); + tracker.addElement(lowest); + tracker.addElement(elem(101)); + tracker.addElement(elem(99)); + tracker.removeElement(lowest); + assertThat(tracker.get(), equalTo(99L)); + } - // remove 99 - tracker.removeElement((Stamped) elems[removalIndex++]); - assertEquals(100L, tracker.get()); + @Test + public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() throws Exception { + final Stamped record = elem(98); + tracker.addElement(record); + tracker.removeElement(record); + assertThat(tracker.get(), equalTo(98L)); + } - // remove 100 - tracker.removeElement((Stamped) elems[removalIndex++]); - assertEquals(100L, tracker.get()); + @Test + public void shouldIgnoreNullRecordOnRemove() throws Exception { + tracker.removeElement(null); + } - assertEquals(insertionIndex, removalIndex); + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() throws Exception { + tracker.addElement(null); } + private Stamped elem(final long timestamp) { + return new Stamped<>("", timestamp); + } } \ No newline at end of file