kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-4651: improve test coverage of stores (#4555)
Date Tue, 20 Feb 2018 20:46:40 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 256708d  KAFKA-4651: improve test coverage of stores (#4555)
256708d is described below

commit 256708dbbb7204e4025f2ca74eceea1170236255
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Tue Feb 20 15:46:36 2018 -0500

    KAFKA-4651: improve test coverage of stores (#4555)
    
    Working on increasing the coverage of stores in unit tests.
    Started with `InMemoryKeyValueLoggedStore`
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../state/internals/RocksDBSessionStore.java       |  28 -----
 .../streams/state/internals/RocksDBStore.java      |  22 ----
 .../state/internals/AbstractKeyValueStoreTest.java |  33 ++++++
 .../internals/InMemoryKeyValueLoggedStoreTest.java |  24 +---
 .../internals/MeteredKeyValueBytesStoreTest.java   |  14 +++
 .../streams/state/internals/RocksDBStoreTest.java  | 129 ++++++++++++---------
 .../streams/state/internals/SegmentsTest.java      |   6 +
 7 files changed, 128 insertions(+), 128 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index 77b1abb..c9267dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
@@ -38,33 +37,6 @@ public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractState
     protected StateSerdes<K, AGG> serdes;
     protected String topic;
 
-    // this is optimizing the case when this store is already a bytes store, in which we
can avoid Bytes.wrap() costs
-    private static class RocksDBSessionBytesStore extends RocksDBSessionStore<Bytes, byte[]>
{
-        RocksDBSessionBytesStore(final SegmentedBytesStore inner) {
-            super(inner, Serdes.Bytes(), Serdes.ByteArray());
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes
key, final long earliestSessionEndTime, final long latestSessionStartTime) {
-            final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(key,
earliestSessionEndTime, latestSessionStartTime);
-            return WrappedSessionStoreIterator.bytesIterator(bytesIterator, serdes);
-        }
-
-        @Override
-        public void remove(final Windowed<Bytes> key) {
-            bytesStore.remove(SessionKeySerde.bytesToBinary(key));
-        }
-
-        @Override
-        public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
-            bytesStore.put(SessionKeySerde.bytesToBinary(sessionKey), aggregate);
-        }
-    }
-
-    static RocksDBSessionStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore
inner) {
-        return new RocksDBSessionBytesStore(inner);
-    }
-
     RocksDBSessionStore(final SegmentedBytesStore bytesStore,
                         final Serde<K> keySerde,
                         final Serde<AGG> aggSerde) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a2e45e0..f54c783 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -363,28 +363,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
         return rocksDbIterator;
     }
 
-    public synchronized KeyValue<Bytes, byte[]> first() {
-        validateStoreOpen();
-
-        final RocksIterator innerIter = db.newIterator();
-        innerIter.seekToFirst();
-        final KeyValue<Bytes, byte[]> pair = new KeyValue<>(new Bytes(innerIter.key()),
innerIter.value());
-        innerIter.close();
-
-        return pair;
-    }
-
-    public synchronized KeyValue<Bytes, byte[]> last() {
-        validateStoreOpen();
-
-        final RocksIterator innerIter = db.newIterator();
-        innerIter.seekToLast();
-        final KeyValue<Bytes, byte[]> pair = new KeyValue<>(new Bytes(innerIter.key()),
innerIter.value());
-        innerIter.close();
-
-        return pair;
-    }
-
     /**
      * Return an approximate count of key-value mappings in this store.
      *
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 0583e91..937b1d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -30,12 +30,18 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 public abstract class AbstractKeyValueStoreTest {
@@ -341,4 +347,31 @@ public abstract class AbstractKeyValueStoreTest {
         store.flush();
         assertEquals(5, store.approximateNumEntries());
     }
+
+    @Test
+    public void shouldPutAll() {
+        List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, "one"));
+        entries.add(new KeyValue<>(2, "two"));
+
+        store.putAll(entries);
+
+        final List<KeyValue<Integer, String>> allReturned = new ArrayList<>();
+        final List<KeyValue<Integer, String>> expectedReturned = Arrays.asList(KeyValue.pair(1,
"one"), KeyValue.pair(2, "two"));
+        final Iterator<KeyValue<Integer, String>> iterator = store.all();
+
+        while (iterator.hasNext()) {
+            allReturned.add(iterator.next());
+        }
+        assertThat(allReturned, equalTo(expectedReturned));
+
+    }
+
+    @Test
+    public void shouldDeleteFromStore() {
+        store.put(1, "one");
+        store.put(2, "two");
+        store.delete(2);
+        assertNull(store.get(2));
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index adaab00..0848970 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -17,19 +17,13 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
 
 public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
 
@@ -37,24 +31,14 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest
{
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext
context) {
         final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
-                Stores.inMemoryKeyValueStore("my-store"),
-                (Serde<K>) context.keySerde(),
-                (Serde<V>) context.valueSerde())
-                .withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
+            Stores.inMemoryKeyValueStore("my-store"),
+            (Serde<K>) context.keySerde(),
+            (Serde<V>) context.valueSerde())
+            .withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
 
         final StateStore store = storeBuilder.build();
         store.init(context, store);
 
         return (KeyValueStore<K, V>) store;
     }
-
-    @Test
-    public void shouldPutAll() {
-        List<KeyValue<Integer, String>> entries = new ArrayList<>();
-        entries.add(new KeyValue<>(1, "1"));
-        entries.add(new KeyValue<>(2, "2"));
-        store.putAll(entries);
-        assertEquals(store.get(1), "1");
-        assertEquals(store.get(2), "2");
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
index 3b9d13f..8880007 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
@@ -193,6 +193,20 @@ public class MeteredKeyValueBytesStoreTest {
         EasyMock.verify(inner);
     }
 
+    @Test
+    public void shouldFlushInnerWhenFlushTimeRecords() {
+        inner.flush();
+        EasyMock.expectLastCall().once();
+        init();
+
+        metered.flush();
+
+        final KafkaMetric metric = metric("flush-rate");
+        assertTrue(metric.value() > 0);
+        EasyMock.verify(inner);
+    }
+
+
     private KafkaMetric metric(final MetricName metricName) {
         return this.metrics.metric(metricName);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 49e893b..4087336 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -61,13 +61,13 @@ public class RocksDBStoreTest {
 
     private Serializer<String> stringSerializer = new StringSerializer();
     private Deserializer<String> stringDeserializer = new StringDeserializer();
-    private RocksDBStore subject;
+    private RocksDBStore rocksDBStore;
     private MockProcessorContext context;
     private File dir;
 
     @Before
     public void setUp() {
-        subject = new RocksDBStore("test");
+        rocksDBStore = new RocksDBStore("test");
         dir = TestUtils.tempDirectory();
         context = new MockProcessorContext(dir,
             Serdes.String(),
@@ -78,18 +78,18 @@ public class RocksDBStoreTest {
 
     @After
     public void tearDown() {
-        subject.close();
+        rocksDBStore.close();
     }
 
     @Test
     public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() throws
Exception {
-        subject.init(context, subject);
+        rocksDBStore.init(context, rocksDBStore);
 
         final String message = "how can a 4 ounce bird carry a 2lb coconut";
         int intKey = 1;
         for (int i = 0; i < 2000000; i++) {
-            subject.put(new Bytes(stringSerializer.serialize(null, "theKeyIs" + intKey++)),
-                stringSerializer.serialize(null, message));
+            rocksDBStore.put(new Bytes(stringSerializer.serialize(null, "theKeyIs" + intKey++)),
+                             stringSerializer.serialize(null, message));
         }
 
         final List<KeyValue<byte[], byte[]>> restoreBytes = new ArrayList<>();
@@ -103,7 +103,7 @@ public class RocksDBStoreTest {
         assertThat(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "restoredKey")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "restoredKey")))),
             equalTo("restoredValue"));
     }
 
@@ -114,7 +114,7 @@ public class RocksDBStoreTest {
         configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test-server:9092");
         configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
         MockRocksDbConfigSetter.called = false;
-        subject.openDB(new MockProcessorContext(tempDir, new StreamsConfig(configs)));
+        rocksDBStore.openDB(new MockProcessorContext(tempDir, new StreamsConfig(configs)));
 
         assertTrue(MockRocksDbConfigSetter.called);
     }
@@ -129,7 +129,7 @@ public class RocksDBStoreTest {
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         tmpDir.setReadOnly();
 
-        subject.openDB(tmpContext);
+        rocksDBStore.openDB(tmpContext);
     }
 
     @Test
@@ -145,91 +145,104 @@ public class RocksDBStoreTest {
             new Bytes(stringSerializer.serialize(null, "3")),
             stringSerializer.serialize(null, "c")));
 
-        subject.init(context, subject);
-        subject.putAll(entries);
-        subject.flush();
+        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.putAll(entries);
+        rocksDBStore.flush();
 
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))),
             "a");
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))),
             "b");
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))),
             "c");
     }
 
     @Test
     public void shouldTogglePrepareForBulkloadSetting() {
-        subject.init(context, subject);
+        rocksDBStore.init(context, rocksDBStore);
         RocksDBStore.RocksDBBatchingRestoreCallback restoreListener =
-            (RocksDBStore.RocksDBBatchingRestoreCallback) subject.batchingStateRestoreCallback;
+            (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback;
 
         restoreListener.onRestoreStart(null, null, 0, 0);
-        assertTrue("Should have set bulk loading to true", subject.isPrepareForBulkload());
+        assertTrue("Should have set bulk loading to true", rocksDBStore.isPrepareForBulkload());
 
         restoreListener.onRestoreEnd(null, null, 0);
-        assertFalse("Should have set bulk loading to false", subject.isPrepareForBulkload());
+        assertFalse("Should have set bulk loading to false", rocksDBStore.isPrepareForBulkload());
     }
 
     @Test
     public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() throws Exception
{
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
 
-        subject.init(context, subject);
-        context.restore(subject.name(), entries);
+        rocksDBStore.init(context, rocksDBStore);
+        context.restore(rocksDBStore.name(), entries);
 
         RocksDBStore.RocksDBBatchingRestoreCallback restoreListener =
-            (RocksDBStore.RocksDBBatchingRestoreCallback) subject.batchingStateRestoreCallback;
+            (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback;
 
         restoreListener.onRestoreStart(null, null, 0, 0);
-        assertTrue("Should have not set bulk loading to true", subject.isPrepareForBulkload());
+        assertTrue("Should have not set bulk loading to true", rocksDBStore.isPrepareForBulkload());
 
         restoreListener.onRestoreEnd(null, null, 0);
-        assertFalse("Should have set bulk loading to false", subject.isPrepareForBulkload());
+        assertFalse("Should have set bulk loading to false", rocksDBStore.isPrepareForBulkload());
     }
 
     @Test
     public void shouldRestoreAll() throws Exception {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
 
-        subject.init(context, subject);
-        context.restore(subject.name(), entries);
+        rocksDBStore.init(context, rocksDBStore);
+        context.restore(rocksDBStore.name(), entries);
 
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))),
             "a");
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))),
             "b");
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))),
             "c");
     }
 
+    @Test
+    public void shouldPutOnlyIfAbsentValue() throws Exception {
+        rocksDBStore.init(context, rocksDBStore);
+        final Bytes keyBytes = new Bytes(stringSerializer.serialize(null, "one"));
+        final byte[] valueBytes = stringSerializer.serialize(null, "A");
+        final byte[] valueBytesUpdate = stringSerializer.serialize(null, "B");
+
+        rocksDBStore.putIfAbsent(keyBytes, valueBytes);
+        rocksDBStore.putIfAbsent(keyBytes, valueBytesUpdate);
+
+        final String retrievedValue = stringDeserializer.deserialize(null, rocksDBStore.get(keyBytes));
+        assertEquals(retrievedValue, "A");
+    }
 
     @Test
     public void shouldHandleDeletesOnRestoreAll() throws Exception {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
         entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
 
-        subject.init(context, subject);
-        context.restore(subject.name(), entries);
+        rocksDBStore.init(context, rocksDBStore);
+        context.restore(rocksDBStore.name(), entries);
 
-        final KeyValueIterator<Bytes, byte[]> iterator = subject.all();
+        final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
         final Set<String> keys = new HashSet<>();
 
         while (iterator.hasNext()) {
@@ -250,10 +263,10 @@ public class RocksDBStoreTest {
         // this will restore key "1" as WriteBatch applies updates in order
         entries.add(new KeyValue<>("1".getBytes("UTF-8"), "restored".getBytes("UTF-8")));
 
-        subject.init(context, subject);
-        context.restore(subject.name(), entries);
+        rocksDBStore.init(context, rocksDBStore);
+        context.restore(rocksDBStore.name(), entries);
 
-        final KeyValueIterator<Bytes, byte[]> iterator = subject.all();
+        final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
         final Set<String> keys = new HashSet<>();
 
         while (iterator.hasNext()) {
@@ -265,17 +278,17 @@ public class RocksDBStoreTest {
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))),
             "restored");
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))),
             "b");
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))),
             "c");
     }
 
@@ -283,24 +296,24 @@ public class RocksDBStoreTest {
     public void shouldRestoreThenDeleteOnRestoreAll() throws Exception {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
 
-        subject.init(context, subject);
+        rocksDBStore.init(context, rocksDBStore);
 
-        context.restore(subject.name(), entries);
+        context.restore(rocksDBStore.name(), entries);
 
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "1")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))),
             "a");
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "2")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))),
             "b");
         assertEquals(
             stringDeserializer.deserialize(
                 null,
-                subject.get(new Bytes(stringSerializer.serialize(null, "3")))),
+                rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))),
             "c");
 
         entries.clear();
@@ -309,9 +322,9 @@ public class RocksDBStoreTest {
         entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
         entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
 
-        context.restore(subject.name(), entries);
+        context.restore(rocksDBStore.name(), entries);
 
-        final KeyValueIterator<Bytes, byte[]> iterator = subject.all();
+        final KeyValueIterator<Bytes, byte[]> iterator = rocksDBStore.all();
         final Set<String> keys = new HashSet<>();
 
         while (iterator.hasNext()) {
@@ -325,57 +338,57 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullPut() {
-        subject.init(context, subject);
+        rocksDBStore.init(context, rocksDBStore);
         try {
-            subject.put(null, stringSerializer.serialize(null, "someVal"));
+            rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
             fail("Should have thrown NullPointerException on null put()");
         } catch (NullPointerException e) { }
     }
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullPutAll() {
-        subject.init(context, subject);
+        rocksDBStore.init(context, rocksDBStore);
         try {
-            subject.put(null, stringSerializer.serialize(null, "someVal"));
+            rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
             fail("Should have thrown NullPointerException on null put()");
         } catch (NullPointerException e) { }
     }
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullGet() {
-        subject.init(context, subject);
+        rocksDBStore.init(context, rocksDBStore);
         try {
-            subject.get(null);
+            rocksDBStore.get(null);
             fail("Should have thrown NullPointerException on null get()");
         } catch (NullPointerException e) { }
     }
 
     @Test
     public void shouldThrowNullPointerExceptionOnDelete() {
-        subject.init(context, subject);
+        rocksDBStore.init(context, rocksDBStore);
         try {
-            subject.delete(null);
+            rocksDBStore.delete(null);
             fail("Should have thrown NullPointerException on deleting null key");
         } catch (NullPointerException e) { }
     }
 
     @Test
     public void shouldThrowNullPointerExceptionOnRange() {
-        subject.init(context, subject);
+        rocksDBStore.init(context, rocksDBStore);
         try {
-            subject.range(null, new Bytes(stringSerializer.serialize(null, "2")));
+            rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2")));
             fail("Should have thrown NullPointerException on deleting null key");
         } catch (NullPointerException e) { }
     }
 
     @Test(expected = ProcessorStateException.class)
     public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
-        subject.init(context, subject);
+        rocksDBStore.init(context, rocksDBStore);
         Utils.delete(dir);
-        subject.put(
+        rocksDBStore.put(
             new Bytes(stringSerializer.serialize(null, "anyKey")),
             stringSerializer.serialize(null, "anyValue"));
-        subject.flush();
+        rocksDBStore.flush();
     }
 
     public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 46606de..deb26f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -133,6 +133,12 @@ public class SegmentsTest {
     }
 
     @Test
+    public void shouldGetCorrectSegmentString() {
+        final Segment segment = segments.getOrCreateSegment(0, context);
+        assertEquals("Segment(id=0, name=test.0)", segment.toString());
+    }
+
+    @Test
     public void shouldCloseAllOpenSegments() {
         final Segment first = segments.getOrCreateSegment(0, context);
         final Segment second = segments.getOrCreateSegment(1, context);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message