kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6487: ChangeLoggingKeyValueBytesStore does not propagate delete (#4495)
Date Fri, 02 Feb 2018 00:16:03 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 2940369  KAFKA-6487: ChangeLoggingKeyValueBytesStore does not propagate delete (#4495)
2940369 is described below

commit 2940369f28dfc6f21f8c13f660d39e831ae3d4d3
Author: bartdevylder <bartdevylder@gmail.com>
AuthorDate: Fri Feb 2 01:14:22 2018 +0100

    KAFKA-6487: ChangeLoggingKeyValueBytesStore does not propagate delete (#4495)
    
    The ChangeLoggingKeyValueBytesStore used to write null to its underlying store instead
of propagating the delete, which has two drawbacks:
    
    * an iterator will see null values
    * unbounded memory growth of the underlying in-memory keyvalue store
    
    The fix will just propagate the delete instead of performing put(key, null).
    
    The changes to the tests:
    
    *extra test whether the key is really gone after delete by calling the approximateEntries
on the underlying store. This number is exact because we know the underlying store in the
test is of type InMemoryKeyValueStore
    * extra test to check a delete is logged as <key, null> (the existing test would
also succeed if the key is just absent)
    
    While also updating the corresponding tests of the ChangeLoggingKeyValueStore I noticed
the class is nowhere used anymore so I removed it from the source code for clarity.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bill@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 .../internals/ChangeLoggingKeyValueBytesStore.java |   4 +-
 .../internals/ChangeLoggingKeyValueStore.java      | 123 -----------
 .../state/internals/AbstractKeyValueStoreTest.java |  47 ++---
 .../ChangeLoggingKeyValueBytesStoreTest.java       |   4 +-
 .../internals/ChangeLoggingKeyValueStoreTest.java  | 225 ---------------------
 .../internals/InMemoryKeyValueLoggedStoreTest.java |  28 ++-
 6 files changed, 47 insertions(+), 384 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 8dc457a..94ee275 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -77,8 +77,8 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
 
     @Override
     public byte[] delete(final Bytes key) {
-        final byte[] oldValue = inner.get(key);
-        put(key, null);
+        final byte[] oldValue = inner.delete(key);
+        changeLogger.logChange(key, null);
         return oldValue;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
deleted file mode 100644
index ea9f7aa..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.ArrayList;
-import java.util.List;
-
-class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore
implements KeyValueStore<K, V> {
-    private final ChangeLoggingKeyValueBytesStore innerBytes;
-    private final Serde keySerde;
-    private final Serde valueSerde;
-    private StateSerdes<K, V> serdes;
-
-
-    ChangeLoggingKeyValueStore(final KeyValueStore<Bytes, byte[]> bytesStore,
-                               final Serde keySerde,
-                               final Serde valueSerde) {
-        this(new ChangeLoggingKeyValueBytesStore(bytesStore), keySerde, valueSerde);
-    }
-
-    private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore bytesStore,
-                                       final Serde keySerde,
-                                       final Serde valueSerde) {
-        super(bytesStore);
-        this.innerBytes = bytesStore;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public void init(final ProcessorContext context, final StateStore root) {
-        innerBytes.init(context, root);
-
-        serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
innerBytes.name()),
-                                   keySerde == null ? (Serde<K>) context.keySerde()
: keySerde,
-                                   valueSerde == null ? (Serde<V>) context.valueSerde()
: valueSerde);
-    }
-
-    @Override
-    public long approximateNumEntries() {
-        return innerBytes.approximateNumEntries();
-    }
-
-    @Override
-    public void put(final K key, final V value) {
-        final Bytes bytesKey = Bytes.wrap(serdes.rawKey(key));
-        final byte[] bytesValue = serdes.rawValue(value);
-        innerBytes.put(bytesKey, bytesValue);
-    }
-
-    @Override
-    public V putIfAbsent(final K key, final V value) {
-        final V v = get(key);
-        if (v == null) {
-            put(key, value);
-        }
-        return v;
-    }
-
-    @Override
-    public void putAll(final List<KeyValue<K, V>> entries) {
-        final List<KeyValue<Bytes, byte[]>> keyValues = new ArrayList<>();
-        for (final KeyValue<K, V> entry : entries) {
-            keyValues.add(KeyValue.pair(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)));
-        }
-        innerBytes.putAll(keyValues);
-    }
-
-    @Override
-    public V delete(final K key) {
-        final byte[] oldValue = innerBytes.delete(Bytes.wrap(serdes.rawKey(key)));
-        if (oldValue == null) {
-            return null;
-        }
-        return serdes.valueFrom(oldValue);
-    }
-
-    @Override
-    public V get(final K key) {
-        final byte[] rawValue = innerBytes.get(Bytes.wrap(serdes.rawKey(key)));
-        if (rawValue == null) {
-            return null;
-        }
-        return serdes.valueFrom(rawValue);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(final K from, final K to) {
-        return new SerializedKeyValueIterator<>(innerBytes.range(Bytes.wrap(serdes.rawKey(from)),
-                                                                 Bytes.wrap(serdes.rawKey(to))),
-                                                                 serdes);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return new SerializedKeyValueIterator<>(innerBytes.all(), serdes);
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 65a9dec..398c4c5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -27,14 +27,14 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
 
 public abstract class AbstractKeyValueStoreTest {
 
-
     protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext
context,
                                                                       Class<K> keyClass,
Class<V> valueClass,
                                                                       boolean useContextSerdes);
@@ -58,6 +58,15 @@ public abstract class AbstractKeyValueStoreTest {
         driver.clear();
     }
 
+    private static Map<Integer, String> getContents(final KeyValueIterator<Integer,
String> iter) {
+        final HashMap<Integer, String> result = new HashMap<>();
+        while (iter.hasNext()) {
+            KeyValue<Integer, String> entry = iter.next();
+            result.put(entry.key, entry.value);
+        }
+        return result;
+    }
+
     @Test
     public void testPutGetRange() {
         // Verify that the store reads and writes correctly ...
@@ -74,6 +83,7 @@ public abstract class AbstractKeyValueStoreTest {
         assertEquals("four", store.get(4));
         assertEquals("five", store.get(5));
         store.delete(5);
+        assertEquals(4, driver.sizeOf(store));
 
         // Flush the store and verify all current entries were properly flushed ...
         store.flush();
@@ -89,31 +99,18 @@ public abstract class AbstractKeyValueStoreTest {
         assertEquals(false, driver.flushedEntryRemoved(4));
         assertEquals(true, driver.flushedEntryRemoved(5));
 
-        // Check range iteration ...
-        try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
-            while (iter.hasNext()) {
-                KeyValue<Integer, String> entry = iter.next();
-                if (entry.key.equals(2))
-                    assertEquals("two", entry.value);
-                else if (entry.key.equals(4))
-                    assertEquals("four", entry.value);
-                else
-                    fail("Unexpected entry: " + entry);
-            }
-        }
+        final HashMap<Integer, String> expectedContents = new HashMap<>();
+        expectedContents.put(2, "two");
+        expectedContents.put(4, "four");
 
         // Check range iteration ...
-        try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
-            while (iter.hasNext()) {
-                KeyValue<Integer, String> entry = iter.next();
-                if (entry.key.equals(2))
-                    assertEquals("two", entry.value);
-                else if (entry.key.equals(4))
-                    assertEquals("four", entry.value);
-                else
-                    fail("Unexpected entry: " + entry);
-            }
-        }
+        assertEquals(expectedContents, getContents(store.range(2, 4)));
+        assertEquals(expectedContents, getContents(store.range(2, 6)));
+
+        // Check all iteration ...
+        expectedContents.put(0, "zero");
+        expectedContents.put(1, "one");
+        assertEquals(expectedContents, getContents(store.all()));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index cf07927..9360dae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -109,9 +109,10 @@ public class ChangeLoggingKeyValueBytesStoreTest {
     }
 
     @Test
-    public void shouldPutNullOnDelete() {
+    public void shouldPropagateDelete() {
         store.put(hi, there);
         store.delete(hi);
+        assertThat(inner.approximateNumEntries(), equalTo(0L));
         assertThat(inner.get(hi), nullValue());
     }
 
@@ -125,6 +126,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
     public void shouldLogKeyNullOnDelete() {
         store.put(hi, there);
         store.delete(hi);
+        assertThat(sent.containsKey(hi), is(true));
         assertThat(sent.get(hi), nullValue());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
deleted file mode 100644
index 8190fd2..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.test.MockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-
-public class ChangeLoggingKeyValueStoreTest {
-
-    private MockProcessorContext context;
-    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv",
Serdes.Bytes(), Serdes.ByteArray());
-    private final Serde<String> keySerde = Serdes.String();
-    private final Serde<String> valueSerde = Serdes.String();
-    private final ChangeLoggingKeyValueStore<String, String> store
-            = new ChangeLoggingKeyValueStore<>(inner, keySerde, valueSerde);
-    private final Map sent = new HashMap<>();
-    private final String hi = "hi";
-    private final Bytes hiBytes = Bytes.wrap(hi.getBytes());
-    private final String there = "there";
-    private final byte[] thereBytes = "there".getBytes();
-    private final String hello = "hello";
-    private final String world = "world";
-
-    @Before
-    public void before() {
-        final NoOpRecordCollector collector = new NoOpRecordCollector() {
-            @Override
-            public <K, V> void send(final String topic,
-                                    K key,
-                                    V value,
-                                    Integer partition,
-                                    Long timestamp,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> valueSerializer) {
-                sent.put(key, value);
-            }
-        };
-        context = new MockProcessorContext(
-            TestUtils.tempDirectory(),
-            Serdes.String(),
-            Serdes.Long(),
-            collector,
-            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        context.setTime(0);
-        store.init(context, store);
-    }
-
-    @After
-    public void after() {
-        context.close();
-        store.close();
-    }
-
-    @Test
-    public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
-        store.put(hi, there);
-        assertThat(deserializedValueFromInner(hi), equalTo(there));
-    }
-
-    @Test
-    public void shouldLogChangeOnPut() {
-        store.put(hi, there);
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldWriteAllKeyValueToInnerStoreOnPutAll() {
-        store.putAll(Arrays.asList(KeyValue.pair(hello, world),
-                                   KeyValue.pair(hi, there)));
-        assertThat(deserializedValueFromInner(hello), equalTo(world));
-        assertThat(deserializedValueFromInner(hi), equalTo(there));
-    }
-
-    @Test
-    public void shouldLogChangesOnPutAll() {
-        store.putAll(Arrays.asList(KeyValue.pair(hi, there),
-                                   KeyValue.pair(hello, world)));
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-        assertThat((byte[]) sent.get(Bytes.wrap(hello.getBytes())), equalTo(world.getBytes()));
-    }
-
-    @Test
-    public void shouldPutNullOnDelete() {
-        store.put(hi, there);
-        store.delete(hi);
-        assertThat(inner.get(hiBytes), nullValue());
-    }
-
-    @Test
-    public void shouldReturnOldValueOnDelete() {
-        store.put(hi, there);
-        assertThat(store.delete(hi), equalTo(there));
-    }
-
-    @Test
-    public void shouldReturnNullOnDeleteIfNoOldValue() {
-        assertThat(store.delete(hi), is(nullValue()));
-    }
-
-    @Test
-    public void shouldLogKeyNullOnDelete() {
-        store.put(hi, there);
-        store.delete(hi);
-        assertThat(sent.get(hi), nullValue());
-    }
-
-    @Test
-    public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() {
-        store.putIfAbsent(hi, there);
-        assertThat(inner.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() {
-        store.put(hi, there);
-        store.putIfAbsent(hi, world);
-        assertThat(inner.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {
-        store.putIfAbsent(hi, there);
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {
-        store.put(hi, there);
-        store.putIfAbsent(hi, world);
-        assertThat((byte[]) sent.get(hiBytes), equalTo(thereBytes));
-    }
-
-    @Test
-    public void shouldReturnCurrentValueOnPutIfAbsent() {
-        store.put(hi, there);
-        assertThat(store.putIfAbsent(hi, world), equalTo(there));
-    }
-
-    @Test
-    public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() {
-        assertThat(store.putIfAbsent(hi, there), is(nullValue()));
-    }
-
-    @Test
-    public void shouldQueryRange() {
-        store.put(hello, world);
-        store.put(hi, there);
-        store.put("zooom", "home");
-        final KeyValueIterator<String, String> range = store.range(hello, hi);
-        assertThat(range.next(), equalTo(KeyValue.pair(hello, world)));
-        assertThat(range.next(), equalTo(KeyValue.pair(hi, there)));
-        assertFalse(range.hasNext());
-    }
-
-    @Test
-    public void shouldReturnAllKeyValues() {
-        store.put(hello, world);
-        store.put(hi, there);
-        final String zooom = "zooom";
-        final String home = "home";
-        store.put(zooom, home);
-        final KeyValueIterator<String, String> all = store.all();
-        assertThat(all.next(), equalTo(KeyValue.pair(hello, world)));
-        assertThat(all.next(), equalTo(KeyValue.pair(hi, there)));
-        assertThat(all.next(), equalTo(KeyValue.pair(zooom, home)));
-        assertFalse(all.hasNext());
-    }
-
-    @Test
-    public void shouldReturnValueOnGetWhenExists() {
-        store.put(hello, world);
-        assertThat(store.get(hello), equalTo(world));
-    }
-
-    @Test
-    public void shouldReturnNullOnGetWhenDoesntExist() {
-        assertThat(store.get(hello), is(nullValue()));
-    }
-
-    @Test
-    public void shouldReturnInnerStoreName() {
-        assertThat(store.name(), equalTo("kv"));
-    }
-
-    private String deserializedValueFromInner(final String key) {
-        return valueSerde.deserializer().deserialize("blah", inner.get(Bytes.wrap(key.getBytes())));
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 3607c9e..d24a90f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
@@ -39,18 +42,27 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest
{
             Class<V> valueClass,
             boolean useContextSerdes) {
 
-        StateStoreSupplier supplier;
+        final Serde<K> keySerde;
+        final Serde<V> valueSerde;
+
         if (useContextSerdes) {
-            supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde())
-                .inMemory().enableLogging(Collections.singletonMap("retention.ms", "1000")).build();
+            keySerde = (Serde<K>) context.keySerde();
+            valueSerde = (Serde<V>) context.valueSerde();
         } else {
-            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass)
-                .inMemory().enableLogging(Collections.singletonMap("retention.ms", "1000")).build();
+            keySerde = Serdes.serdeFrom(keyClass);
+            valueSerde = Serdes.serdeFrom(valueClass);
         }
 
-        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("my-store"),
+                keySerde,
+                valueSerde)
+                .withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
+
+        final StateStore store = storeBuilder.build();
         store.init(context, store);
-        return store;
+
+        return (KeyValueStore<K, V>) store;
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message