From commits-return-9024-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Feb 20 21:46:43 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7F7FF180654 for ; Tue, 20 Feb 2018 21:46:42 +0100 (CET) Received: (qmail 36356 invoked by uid 500); 20 Feb 2018 20:46:41 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 36343 invoked by uid 99); 20 Feb 2018 20:46:41 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Feb 2018 20:46:41 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 9439082497; Tue, 20 Feb 2018 20:46:40 +0000 (UTC) Date: Tue, 20 Feb 2018 20:46:40 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-4651: improve test coverage of stores (#4555) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <151915960005.11419.1000040332960677165@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: eaafbdecb56d11f66d833b63fbc2aced65990e69 X-Git-Newrev: 256708dbbb7204e4025f2ca74eceea1170236255 X-Git-Rev: 256708dbbb7204e4025f2ca74eceea1170236255 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 256708d KAFKA-4651: improve test coverage of stores (#4555) 256708d is described below commit 256708dbbb7204e4025f2ca74eceea1170236255 Author: Bill Bejeck AuthorDate: Tue Feb 20 15:46:36 2018 -0500 KAFKA-4651: improve test coverage of stores (#4555) Working on increasing the coverage of stores in unit tests. Started with `InMemoryKeyValueLoggedStore` Reviewers: Matthias J. Sax , Guozhang Wang --- .../state/internals/RocksDBSessionStore.java | 28 ----- .../streams/state/internals/RocksDBStore.java | 22 ---- .../state/internals/AbstractKeyValueStoreTest.java | 33 ++++++ .../internals/InMemoryKeyValueLoggedStoreTest.java | 24 +--- .../internals/MeteredKeyValueBytesStoreTest.java | 14 +++ .../streams/state/internals/RocksDBStoreTest.java | 129 ++++++++++++--------- .../streams/state/internals/SegmentsTest.java | 6 + 7 files changed, 128 insertions(+), 128 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 77b1abb..c9267dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; @@ -38,33 +37,6 @@ public class RocksDBSessionStore extends WrappedStateStore.AbstractState protected StateSerdes serdes; protected String topic; - // this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs - private static class RocksDBSessionBytesStore extends RocksDBSessionStore { - RocksDBSessionBytesStore(final SegmentedBytesStore inner) { - super(inner, Serdes.Bytes(), Serdes.ByteArray()); - } - - @Override - public KeyValueIterator, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { - final KeyValueIterator bytesIterator = bytesStore.fetch(key, earliestSessionEndTime, latestSessionStartTime); - return WrappedSessionStoreIterator.bytesIterator(bytesIterator, serdes); - } - - @Override - public void remove(final Windowed key) { - bytesStore.remove(SessionKeySerde.bytesToBinary(key)); - } - - @Override - public void put(final Windowed sessionKey, final byte[] aggregate) { - bytesStore.put(SessionKeySerde.bytesToBinary(sessionKey), aggregate); - } - } - - static RocksDBSessionStore bytesStore(final SegmentedBytesStore inner) { - return new RocksDBSessionBytesStore(inner); - } - RocksDBSessionStore(final SegmentedBytesStore bytesStore, final Serde keySerde, final Serde aggSerde) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index a2e45e0..f54c783 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -363,28 +363,6 @@ public class RocksDBStore implements KeyValueStore { return rocksDbIterator; } - public synchronized KeyValue first() { - validateStoreOpen(); - - final RocksIterator innerIter = db.newIterator(); - innerIter.seekToFirst(); - final KeyValue pair = new KeyValue<>(new Bytes(innerIter.key()), innerIter.value()); - innerIter.close(); - - return pair; - } - - public synchronized KeyValue last() { - validateStoreOpen(); - - final RocksIterator innerIter = db.newIterator(); - innerIter.seekToLast(); - final KeyValue pair = new KeyValue<>(new Bytes(innerIter.key()), innerIter.value()); - innerIter.close(); - - return pair; - } - /** * Return an approximate count of key-value mappings in this store. * diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 0583e91..937b1d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -30,12 +30,18 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; public abstract class AbstractKeyValueStoreTest { @@ -341,4 +347,31 @@ public abstract class AbstractKeyValueStoreTest { store.flush(); assertEquals(5, store.approximateNumEntries()); } + + @Test + public void shouldPutAll() { + List> entries = new ArrayList<>(); + entries.add(new KeyValue<>(1, "one")); + entries.add(new KeyValue<>(2, "two")); + + store.putAll(entries); + + final List> allReturned = new ArrayList<>(); + final List> expectedReturned = Arrays.asList(KeyValue.pair(1, "one"), KeyValue.pair(2, "two")); + final Iterator> iterator = store.all(); + + while (iterator.hasNext()) { + allReturned.add(iterator.next()); + } + assertThat(allReturned, equalTo(expectedReturned)); + + } + + @Test + public void shouldDeleteFromStore() { + store.put(1, "one"); + store.put(2, "two"); + store.delete(2); + assertNull(store.get(2)); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java index adaab00..0848970 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java @@ -17,19 +17,13 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import org.junit.Test; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; - -import static org.junit.Assert.assertEquals; public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest { @@ -37,24 +31,14 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest { @Override protected KeyValueStore createKeyValueStore(final ProcessorContext context) { final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("my-store"), - (Serde) context.keySerde(), - (Serde) context.valueSerde()) - .withLoggingEnabled(Collections.singletonMap("retention.ms", "1000")); + Stores.inMemoryKeyValueStore("my-store"), + (Serde) context.keySerde(), + (Serde) context.valueSerde()) + .withLoggingEnabled(Collections.singletonMap("retention.ms", "1000")); final StateStore store = storeBuilder.build(); store.init(context, store); return (KeyValueStore) store; } - - @Test - public void shouldPutAll() { - List> entries = new ArrayList<>(); - entries.add(new KeyValue<>(1, "1")); - entries.add(new KeyValue<>(2, "2")); - store.putAll(entries); - assertEquals(store.get(1), "1"); - assertEquals(store.get(2), "2"); - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java index 3b9d13f..8880007 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java @@ -193,6 +193,20 @@ public class MeteredKeyValueBytesStoreTest { EasyMock.verify(inner); } + @Test + public void shouldFlushInnerWhenFlushTimeRecords() { + inner.flush(); + EasyMock.expectLastCall().once(); + init(); + + metered.flush(); + + final KafkaMetric metric = metric("flush-rate"); + assertTrue(metric.value() > 0); + EasyMock.verify(inner); + } + + private KafkaMetric metric(final MetricName metricName) { return this.metrics.metric(metricName); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 49e893b..4087336 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -61,13 +61,13 @@ public class RocksDBStoreTest { private Serializer stringSerializer = new StringSerializer(); private Deserializer stringDeserializer = new StringDeserializer(); - private RocksDBStore subject; + private RocksDBStore rocksDBStore; private MockProcessorContext context; private File dir; @Before public void setUp() { - subject = new RocksDBStore("test"); + rocksDBStore = new RocksDBStore("test"); dir = TestUtils.tempDirectory(); context = new MockProcessorContext(dir, Serdes.String(), @@ -78,18 +78,18 @@ public class RocksDBStoreTest { @After public void tearDown() { - subject.close(); + rocksDBStore.close(); } @Test public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() throws Exception { - subject.init(context, subject); + rocksDBStore.init(context, rocksDBStore); final String message = "how can a 4 ounce bird carry a 2lb coconut"; int intKey = 1; for (int i = 0; i < 2000000; i++) { - subject.put(new Bytes(stringSerializer.serialize(null, "theKeyIs" + intKey++)), - stringSerializer.serialize(null, message)); + rocksDBStore.put(new Bytes(stringSerializer.serialize(null, "theKeyIs" + intKey++)), + stringSerializer.serialize(null, message)); } final List> restoreBytes = new ArrayList<>(); @@ -103,7 +103,7 @@ public class RocksDBStoreTest { assertThat( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "restoredKey")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "restoredKey")))), equalTo("restoredValue")); } @@ -114,7 +114,7 @@ public class RocksDBStoreTest { configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test-server:9092"); configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); MockRocksDbConfigSetter.called = false; - subject.openDB(new MockProcessorContext(tempDir, new StreamsConfig(configs))); + rocksDBStore.openDB(new MockProcessorContext(tempDir, new StreamsConfig(configs))); assertTrue(MockRocksDbConfigSetter.called); } @@ -129,7 +129,7 @@ public class RocksDBStoreTest { new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); tmpDir.setReadOnly(); - subject.openDB(tmpContext); + rocksDBStore.openDB(tmpContext); } @Test @@ -145,91 +145,104 @@ public class RocksDBStoreTest { new Bytes(stringSerializer.serialize(null, "3")), stringSerializer.serialize(null, "c"))); - subject.init(context, subject); - subject.putAll(entries); - subject.flush(); + rocksDBStore.init(context, rocksDBStore); + rocksDBStore.putAll(entries); + rocksDBStore.flush(); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "1")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))), "a"); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "2")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))), "b"); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "3")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))), "c"); } @Test public void shouldTogglePrepareForBulkloadSetting() { - subject.init(context, subject); + rocksDBStore.init(context, rocksDBStore); RocksDBStore.RocksDBBatchingRestoreCallback restoreListener = - (RocksDBStore.RocksDBBatchingRestoreCallback) subject.batchingStateRestoreCallback; + (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback; restoreListener.onRestoreStart(null, null, 0, 0); - assertTrue("Should have set bulk loading to true", subject.isPrepareForBulkload()); + assertTrue("Should have set bulk loading to true", rocksDBStore.isPrepareForBulkload()); restoreListener.onRestoreEnd(null, null, 0); - assertFalse("Should have set bulk loading to false", subject.isPrepareForBulkload()); + assertFalse("Should have set bulk loading to false", rocksDBStore.isPrepareForBulkload()); } @Test public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() throws Exception { final List> entries = getKeyValueEntries(); - subject.init(context, subject); - context.restore(subject.name(), entries); + rocksDBStore.init(context, rocksDBStore); + context.restore(rocksDBStore.name(), entries); RocksDBStore.RocksDBBatchingRestoreCallback restoreListener = - (RocksDBStore.RocksDBBatchingRestoreCallback) subject.batchingStateRestoreCallback; + (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback; restoreListener.onRestoreStart(null, null, 0, 0); - assertTrue("Should have not set bulk loading to true", subject.isPrepareForBulkload()); + assertTrue("Should have not set bulk loading to true", rocksDBStore.isPrepareForBulkload()); restoreListener.onRestoreEnd(null, null, 0); - assertFalse("Should have set bulk loading to false", subject.isPrepareForBulkload()); + assertFalse("Should have set bulk loading to false", rocksDBStore.isPrepareForBulkload()); } @Test public void shouldRestoreAll() throws Exception { final List> entries = getKeyValueEntries(); - subject.init(context, subject); - context.restore(subject.name(), entries); + rocksDBStore.init(context, rocksDBStore); + context.restore(rocksDBStore.name(), entries); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "1")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))), "a"); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "2")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))), "b"); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "3")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))), "c"); } + @Test + public void shouldPutOnlyIfAbsentValue() throws Exception { + rocksDBStore.init(context, rocksDBStore); + final Bytes keyBytes = new Bytes(stringSerializer.serialize(null, "one")); + final byte[] valueBytes = stringSerializer.serialize(null, "A"); + final byte[] valueBytesUpdate = stringSerializer.serialize(null, "B"); + + rocksDBStore.putIfAbsent(keyBytes, valueBytes); + rocksDBStore.putIfAbsent(keyBytes, valueBytesUpdate); + + final String retrievedValue = stringDeserializer.deserialize(null, rocksDBStore.get(keyBytes)); + assertEquals(retrievedValue, "A"); + } @Test public void shouldHandleDeletesOnRestoreAll() throws Exception { final List> entries = getKeyValueEntries(); entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null)); - subject.init(context, subject); - context.restore(subject.name(), entries); + rocksDBStore.init(context, rocksDBStore); + context.restore(rocksDBStore.name(), entries); - final KeyValueIterator iterator = subject.all(); + final KeyValueIterator iterator = rocksDBStore.all(); final Set keys = new HashSet<>(); while (iterator.hasNext()) { @@ -250,10 +263,10 @@ public class RocksDBStoreTest { // this will restore key "1" as WriteBatch applies updates in order entries.add(new KeyValue<>("1".getBytes("UTF-8"), "restored".getBytes("UTF-8"))); - subject.init(context, subject); - context.restore(subject.name(), entries); + rocksDBStore.init(context, rocksDBStore); + context.restore(rocksDBStore.name(), entries); - final KeyValueIterator iterator = subject.all(); + final KeyValueIterator iterator = rocksDBStore.all(); final Set keys = new HashSet<>(); while (iterator.hasNext()) { @@ -265,17 +278,17 @@ public class RocksDBStoreTest { assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "1")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))), "restored"); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "2")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))), "b"); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "3")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))), "c"); } @@ -283,24 +296,24 @@ public class RocksDBStoreTest { public void shouldRestoreThenDeleteOnRestoreAll() throws Exception { final List> entries = getKeyValueEntries(); - subject.init(context, subject); + rocksDBStore.init(context, rocksDBStore); - context.restore(subject.name(), entries); + context.restore(rocksDBStore.name(), entries); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "1")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))), "a"); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "2")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))), "b"); assertEquals( stringDeserializer.deserialize( null, - subject.get(new Bytes(stringSerializer.serialize(null, "3")))), + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))), "c"); entries.clear(); @@ -309,9 +322,9 @@ public class RocksDBStoreTest { entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8"))); entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null)); - context.restore(subject.name(), entries); + context.restore(rocksDBStore.name(), entries); - final KeyValueIterator iterator = subject.all(); + final KeyValueIterator iterator = rocksDBStore.all(); final Set keys = new HashSet<>(); while (iterator.hasNext()) { @@ -325,57 +338,57 @@ public class RocksDBStoreTest { @Test public void shouldThrowNullPointerExceptionOnNullPut() { - subject.init(context, subject); + rocksDBStore.init(context, rocksDBStore); try { - subject.put(null, stringSerializer.serialize(null, "someVal")); + rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")); fail("Should have thrown NullPointerException on null put()"); } catch (NullPointerException e) { } } @Test public void shouldThrowNullPointerExceptionOnNullPutAll() { - subject.init(context, subject); + rocksDBStore.init(context, rocksDBStore); try { - subject.put(null, stringSerializer.serialize(null, "someVal")); + rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")); fail("Should have thrown NullPointerException on null put()"); } catch (NullPointerException e) { } } @Test public void shouldThrowNullPointerExceptionOnNullGet() { - subject.init(context, subject); + rocksDBStore.init(context, rocksDBStore); try { - subject.get(null); + rocksDBStore.get(null); fail("Should have thrown NullPointerException on null get()"); } catch (NullPointerException e) { } } @Test public void shouldThrowNullPointerExceptionOnDelete() { - subject.init(context, subject); + rocksDBStore.init(context, rocksDBStore); try { - subject.delete(null); + rocksDBStore.delete(null); fail("Should have thrown NullPointerException on deleting null key"); } catch (NullPointerException e) { } } @Test public void shouldThrowNullPointerExceptionOnRange() { - subject.init(context, subject); + rocksDBStore.init(context, rocksDBStore); try { - subject.range(null, new Bytes(stringSerializer.serialize(null, "2"))); + rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2"))); fail("Should have thrown NullPointerException on deleting null key"); } catch (NullPointerException e) { } } @Test(expected = ProcessorStateException.class) public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException { - subject.init(context, subject); + rocksDBStore.init(context, rocksDBStore); Utils.delete(dir); - subject.put( + rocksDBStore.put( new Bytes(stringSerializer.serialize(null, "anyKey")), stringSerializer.serialize(null, "anyValue")); - subject.flush(); + rocksDBStore.flush(); } public static class MockRocksDbConfigSetter implements RocksDBConfigSetter { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index 46606de..deb26f7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -133,6 +133,12 @@ public class SegmentsTest { } @Test + public void shouldGetCorrectSegmentString() { + final Segment segment = segments.getOrCreateSegment(0, context); + assertEquals("Segment(id=0, name=test.0)", segment.toString()); + } + + @Test public void shouldCloseAllOpenSegments() { final Segment first = segments.getOrCreateSegment(0, context); final Segment second = segments.getOrCreateSegment(1, context); -- To stop receiving notification emails like this one, please contact guozhang@apache.org.