Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EACBB18EAA for ; Tue, 2 Feb 2016 00:11:27 +0000 (UTC) Received: (qmail 52151 invoked by uid 500); 2 Feb 2016 00:11:27 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 52025 invoked by uid 500); 2 Feb 2016 00:11:27 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 52008 invoked by uid 99); 2 Feb 2016 00:11:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Feb 2016 00:11:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 96269DFCE4; Tue, 2 Feb 2016 00:11:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ewencp@apache.org To: commits@kafka.apache.org Date: Tue, 02 Feb 2016 00:11:27 -0000 Message-Id: <3b8b4d7780b148b999a7da2c3286000a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] kafka git commit: KAFKA-3060: Refactor MeteredStore and RockDBStore Impl Repository: kafka Updated Branches: refs/heads/trunk 66ecf3f08 -> 57da044a9 http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index d854c92..afb0f09 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -22,11 +22,13 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.Serdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import org.apache.kafka.streams.state.WindowStoreUtil; +import org.apache.kafka.streams.state.WindowStoreUtils; + import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -46,7 +48,7 @@ public class RocksDBWindowStore implements WindowStore { public final long id; Segment(String name, long id) { - super(name, WindowStoreUtil.INNER_SERDES); + super(name, WindowStoreUtils.INNER_SERDES); this.id = id; } @@ -61,7 +63,7 @@ public class RocksDBWindowStore implements WindowStore { private int index = 0; RocksDBWindowStoreIterator(Serdes serdes) { - this(serdes, WindowStoreUtil.NO_ITERATORS); + this(serdes, WindowStoreUtils.NO_ITERATORS); } RocksDBWindowStoreIterator(Serdes serdes, KeyValueIterator[] iterators) { @@ -87,7 +89,7 @@ public class RocksDBWindowStore implements WindowStore { KeyValue kv = iterators[index].next(); - return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(kv.key), + return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key), serdes.valueFrom(kv.value)); } @@ -111,10 +113,14 @@ public class RocksDBWindowStore implements WindowStore { private final Segment[] segments; private final Serdes serdes; private final SimpleDateFormat formatter; + private final StoreChangeLogger.ValueGetter getter; private ProcessorContext context; - private long currentSegmentId = -1L; private int seqnum = 0; + private long currentSegmentId = -1L; + + private boolean loggingEnabled = false; + private StoreChangeLogger changeLogger = null; public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes serdes) { this.name = name; @@ -127,11 +133,23 @@ public class RocksDBWindowStore implements WindowStore { this.retainDuplicates = retainDuplicates; + this.getter = new StoreChangeLogger.ValueGetter() { + public byte[] get(byte[] key) { + return getInternal(key); + } + }; + // Create a date formatter. Formatted timestamps are used as segment name suffixes this.formatter = new SimpleDateFormat("yyyyMMddHHmm"); this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT")); } + public RocksDBWindowStore enableLogging() { + loggingEnabled = true; + + return this; + } + @Override public String name() { return name; @@ -140,6 +158,17 @@ public class RocksDBWindowStore implements WindowStore { @Override public void init(ProcessorContext context) { this.context = context; + + this.changeLogger = this.loggingEnabled ? + new RawStoreChangeLogger(name, context) : null; + + // register and possibly restore the state from the logs + context.register(this, loggingEnabled, new StateRestoreCallback() { + @Override + public void restore(byte[] key, byte[] value) { + putInternal(key, value); + } + }); } @Override @@ -153,6 +182,9 @@ public class RocksDBWindowStore implements WindowStore { if (segment != null) segment.flush(); } + + if (loggingEnabled) + changeLogger.logChange(this.getter); } @Override @@ -165,16 +197,25 @@ public class RocksDBWindowStore implements WindowStore { @Override public void put(K key, V value) { - putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP); + byte[] rawKey = putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP); + + if (loggingEnabled) { + changeLogger.add(rawKey); + changeLogger.maybeLogChange(this.getter); + } } @Override public void put(K key, V value, long timestamp) { - putAndReturnInternalKey(key, value, timestamp); + byte[] rawKey = putAndReturnInternalKey(key, value, timestamp); + + if (loggingEnabled) { + changeLogger.add(rawKey); + changeLogger.maybeLogChange(this.getter); + } } - @Override - public byte[] putAndReturnInternalKey(K key, V value, long t) { + private byte[] putAndReturnInternalKey(K key, V value, long t) { long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t; long segmentId = segmentId(timestamp); @@ -189,7 +230,7 @@ public class RocksDBWindowStore implements WindowStore { if (segmentId > currentSegmentId - segments.length) { if (retainDuplicates) seqnum = (seqnum + 1) & 0x7FFFFFFF; - byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes); + byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes); getSegment(segmentId).put(binaryKey, serdes.rawValue(value)); return binaryKey; } else { @@ -197,9 +238,8 @@ public class RocksDBWindowStore implements WindowStore { } } - @Override - public void putInternal(byte[] binaryKey, byte[] binaryValue) { - long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey)); + private void putInternal(byte[] binaryKey, byte[] binaryValue) { + long segmentId = segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey)); if (segmentId > currentSegmentId) { // A new segment will be created. Clean up old segments first. @@ -212,9 +252,8 @@ public class RocksDBWindowStore implements WindowStore { getSegment(segmentId).put(binaryKey, binaryValue); } - @Override - public byte[] getInternal(byte[] binaryKey) { - long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey)); + private byte[] getInternal(byte[] binaryKey) { + long segmentId = segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey)); Segment segment = segments[(int) (segmentId % segments.length)]; @@ -231,8 +270,8 @@ public class RocksDBWindowStore implements WindowStore { long segFrom = segmentId(timeFrom); long segTo = segmentId(Math.max(0L, timeTo)); - byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes); - byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes); + byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes); + byte[] binaryUntil = WindowStoreUtils.toBinaryKey(key, timeTo + 1L, 0, serdes); ArrayList> iterators = new ArrayList<>(); @@ -271,15 +310,16 @@ public class RocksDBWindowStore implements WindowStore { } } - public long segmentId(long timestamp) { + private long segmentId(long timestamp) { return timestamp / segmentInterval; } + // this method is defined public since it is used for unit tests public String directorySuffix(long segmentId) { return formatter.format(new Date(segmentId * segmentInterval)); } - // this method is used by a test + // this method is defined public since it is used for unit tests public Set segmentIds() { HashSet segmentIds = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java index fa85ce9..6823e6d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -36,7 +36,7 @@ public class RocksDBWindowStoreSupplier implements StateStoreSupplier { private final long retentionPeriod; private final boolean retainDuplicates; private final int numSegments; - private final Serdes serdes; + private final Serdes serdes; private final Time time; public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes serdes, Time time) { @@ -53,7 +53,7 @@ public class RocksDBWindowStoreSupplier implements StateStoreSupplier { } public StateStore get() { - return new MeteredWindowStore<>(new RocksDBWindowStore(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time); + return new MeteredWindowStore<>(new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, serdes).enableLogging(), "rocksdb-window", time); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java index da5544c..b330334 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java @@ -32,28 +32,41 @@ public class StoreChangeLogger { V get(K key); } + // TODO: these values should be configurable + protected static final int DEFAULT_WRITE_BATCH_SIZE = 100; + protected final Serdes serialization; - private final Set dirty; - private final Set removed; + private final String topic; + private final int partition; + private final ProcessorContext context; private final int maxDirty; private final int maxRemoved; - private final String topic; - private int partition; - private ProcessorContext context; + protected Set dirty; + protected Set removed; - // always wrap the logged store with the metered store public StoreChangeLogger(String topic, ProcessorContext context, Serdes serialization) { + this(topic, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE); + } + + public StoreChangeLogger(String topic, ProcessorContext context, Serdes serialization, int maxDirty, int maxRemoved) { + this(topic, context, context.id().partition, serialization, maxDirty, maxRemoved); + init(); + } + + protected StoreChangeLogger(String topic, ProcessorContext context, int partition, Serdes serialization, int maxDirty, int maxRemoved) { this.topic = topic; - this.serialization = serialization; this.context = context; - this.partition = context.id().partition; + this.partition = partition; + this.serialization = serialization; + this.maxDirty = maxDirty; + this.maxRemoved = maxRemoved; + } + public void init() { this.dirty = new HashSet<>(); this.removed = new HashSet<>(); - this.maxDirty = 100; // TODO: this needs to be configurable - this.maxRemoved = 100; // TODO: this needs to be configurable } public void add(K key) { @@ -89,4 +102,18 @@ public class StoreChangeLogger { } } + public void clear() { + this.removed.clear(); + this.dirty.clear(); + } + + // this is for test only + public int numDirty() { + return this.dirty.size(); + } + + // this is for test only + public int numRemoved() { + return this.removed.size(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index cb6ea05..40cce93 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -37,7 +37,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateUtils; +import org.apache.kafka.streams.state.StateTestUtils; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.ProcessorTopologyTestDriver; @@ -65,7 +65,7 @@ public class ProcessorTopologyTest { @Before public void setup() { // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ... - File localState = StateUtils.tempDir(); + File localState = StateTestUtils.tempDir(); Properties props = new Properties(); props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 1e9c3ba..daa7201 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -219,7 +219,6 @@ public class KeyValueStoreTestDriver { return new KeyValueStoreTestDriver(serdes); } - private final Serdes serdes; private final Map flushedEntries = new HashMap<>(); private final Set flushedRemovals = new HashSet<>(); private final List> restorableEntries = new LinkedList<>(); @@ -238,22 +237,40 @@ public class KeyValueStoreTestDriver { private final RecordCollector recordCollector; private File stateDir = null; - protected KeyValueStoreTestDriver(Serdes serdes) { - this.serdes = serdes; + protected KeyValueStoreTestDriver(final Serdes serdes) { ByteArraySerializer rawSerializer = new ByteArraySerializer(); - Producer producer = new MockProducer(true, rawSerializer, rawSerializer); + Producer producer = new MockProducer<>(true, rawSerializer, rawSerializer); + this.recordCollector = new RecordCollector(producer) { + @SuppressWarnings("unchecked") @Override public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer) { - recordFlushed(record.key(), record.value()); + // for byte arrays we need to wrap it for comparison + + K key; + if (record.key() instanceof byte[]) { + key = serdes.keyFrom((byte[]) record.key()); + } else { + key = (K) record.key(); + } + + V value; + if (record.key() instanceof byte[]) { + value = serdes.valueFrom((byte[]) record.value()); + } else { + value = (V) record.value(); + } + + recordFlushed(key, value); } @Override public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer, StreamPartitioner partitioner) { - recordFlushed(record.key(), record.value()); + // ignore partitioner + send(record, keySerializer, valueSerializer); } }; - this.stateDir = StateUtils.tempDir(); + this.stateDir = StateTestUtils.tempDir(); this.stateDir.mkdirs(); Properties props = new Properties(); @@ -279,7 +296,7 @@ public class KeyValueStoreTestDriver { @Override public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) { storeMap.put(store.name(), store); - restoreEntries(func); + restoreEntries(func, serdes); } @Override @@ -299,21 +316,19 @@ public class KeyValueStoreTestDriver { }; } - @SuppressWarnings("unchecked") - protected void recordFlushed(K1 key, V1 value) { - K k = (K) key; + protected void recordFlushed(K key, V value) { if (value == null) { // This is a removal ... - flushedRemovals.add(k); - flushedEntries.remove(k); + flushedRemovals.add(key); + flushedEntries.remove(key); } else { // This is a normal add - flushedEntries.put(k, (V) value); - flushedRemovals.remove(k); + flushedEntries.put(key, value); + flushedRemovals.remove(key); } } - private void restoreEntries(StateRestoreCallback func) { + private void restoreEntries(StateRestoreCallback func, Serdes serdes) { for (KeyValue entry : restorableEntries) { if (entry != null) { byte[] rawKey = serdes.rawKey(entry.key); @@ -440,6 +455,13 @@ public class KeyValueStoreTestDriver { } /** + * Return number of removed entry + */ + public int numFlushedEntryRemoved() { + return flushedRemovals.size(); + } + + /** * Remove all {@link #flushedEntryStored(Object) flushed entries}, {@link #flushedEntryRemoved(Object) flushed removals}, */ public void clear() { http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java new file mode 100644 index 0000000..70e6cf6 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A utility for tests to create and manage unique and isolated directories on the file system for local state. + */ +public class StateTestUtils { + + private static final AtomicLong INSTANCE_COUNTER = new AtomicLong(); + + /** + * Create a new temporary directory that will be cleaned up automatically upon shutdown. + * @return the new directory that will exist; never null + */ + public static File tempDir() { + try { + final File dir = Files.createTempDirectory("test").toFile(); + dir.mkdirs(); + dir.deleteOnExit(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + deleteDirectory(dir); + } + }); + return dir; + } catch (IOException ex) { + throw new RuntimeException("Failed to create a temp dir", ex); + } + } + + private static void deleteDirectory(File dir) { + if (dir != null && dir.exists()) { + try { + Files.walkFileTree(dir.toPath(), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + + }); + } catch (IOException e) { + // do nothing + } + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java deleted file mode 100644 index c014ae5..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state; - -import java.io.File; -import java.io.IOException; -import java.nio.file.FileVisitResult; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.concurrent.atomic.AtomicLong; - -/** - * A utility for tests to create and manage unique and isolated directories on the file system for local state. - */ -public class StateUtils { - - private static final AtomicLong INSTANCE_COUNTER = new AtomicLong(); - - /** - * Create a new temporary directory that will be cleaned up automatically upon shutdown. - * @return the new directory that will exist; never null - */ - public static File tempDir() { - try { - final File dir = Files.createTempDirectory("test").toFile(); - dir.mkdirs(); - dir.deleteOnExit(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - deleteDirectory(dir); - } - }); - return dir; - } catch (IOException ex) { - throw new RuntimeException("Failed to create a temp dir", ex); - } - } - - private static void deleteDirectory(File dir) { - if (dir != null && dir.exists()) { - try { - Files.walkFileTree(dir.toPath(), new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - Files.delete(file); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - Files.delete(dir); - return FileVisitResult.CONTINUE; - } - - }); - } catch (IOException e) { - // do nothing - } - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 8effd77..ee343e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -37,7 +37,7 @@ public abstract class AbstractKeyValueStoreTest { @Test public void testPutGetRange() { // Create the test driver ... - KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(); + KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(Integer.class, String.class); KeyValueStore store = createKeyValueStore(driver.context(), Integer.class, String.class, false); try { http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index d7cc5b9..10f31d6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -16,144 +16,95 @@ */ package org.apache.kafka.streams.state.internals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStoreTestDriver; import org.apache.kafka.streams.state.Stores; import org.junit.Test; -public class InMemoryLRUCacheStoreTest { - - @SuppressWarnings("unchecked") - @Test - public void testPutGetRange() { - // Create the test driver ... - KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(); - StateStoreSupplier supplier = Stores.create("my-store") - .withIntegerKeys().withStringValues() - .inMemory().maxEntries(3) - .build(); - KeyValueStore store = (KeyValueStore) supplier.get(); - store.init(driver.context()); - - // Verify that the store reads and writes correctly, keeping only the last 2 entries ... - store.put(0, "zero"); - store.put(1, "one"); - store.put(2, "two"); - store.put(3, "three"); - store.put(4, "four"); - store.put(5, "five"); - - // It should only keep the last 4 added ... - assertEquals(3, driver.sizeOf(store)); - assertNull(store.get(0)); - assertNull(store.get(1)); - assertNull(store.get(2)); - assertEquals("three", store.get(3)); - assertEquals("four", store.get(4)); - assertEquals("five", store.get(5)); - store.delete(5); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertNull(driver.flushedEntryStored(0)); - assertNull(driver.flushedEntryStored(1)); - assertNull(driver.flushedEntryStored(2)); - assertEquals("three", driver.flushedEntryStored(3)); - assertEquals("four", driver.flushedEntryStored(4)); - assertNull(driver.flushedEntryStored(5)); +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; - assertEquals(true, driver.flushedEntryRemoved(0)); - assertEquals(true, driver.flushedEntryRemoved(1)); - assertEquals(true, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(3)); - assertEquals(false, driver.flushedEntryRemoved(4)); - assertEquals(true, driver.flushedEntryRemoved(5)); - } +public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest { @SuppressWarnings("unchecked") - @Test - public void testPutGetRangeWithDefaultSerdes() { - // Create the test driver ... - KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(); - - Serializer keySer = (Serializer) driver.context().keySerializer(); - Deserializer keyDeser = (Deserializer) driver.context().keyDeserializer(); - Serializer valSer = (Serializer) driver.context().valueSerializer(); - Deserializer valDeser = (Deserializer) driver.context().valueDeserializer(); - StateStoreSupplier supplier = Stores.create("my-store") - .withKeys(keySer, keyDeser) - .withValues(valSer, valDeser) - .inMemory().maxEntries(3) - .build(); - KeyValueStore store = (KeyValueStore) supplier.get(); - store.init(driver.context()); - - // Verify that the store reads and writes correctly, keeping only the last 2 entries ... - store.put(0, "zero"); - store.put(1, "one"); - store.put(2, "two"); - store.put(3, "three"); - store.put(4, "four"); - store.put(5, "five"); - - // It should only keep the last 4 added ... - assertEquals(3, driver.sizeOf(store)); - assertNull(store.get(0)); - assertNull(store.get(1)); - assertNull(store.get(2)); - assertEquals("three", store.get(3)); - assertEquals("four", store.get(4)); - assertEquals("five", store.get(5)); - store.delete(5); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertNull(driver.flushedEntryStored(0)); - assertNull(driver.flushedEntryStored(1)); - assertNull(driver.flushedEntryStored(2)); - assertEquals("three", driver.flushedEntryStored(3)); - assertEquals("four", driver.flushedEntryStored(4)); - assertNull(driver.flushedEntryStored(5)); - - assertEquals(true, driver.flushedEntryRemoved(0)); - assertEquals(true, driver.flushedEntryRemoved(1)); - assertEquals(true, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(3)); - assertEquals(false, driver.flushedEntryRemoved(4)); - assertEquals(true, driver.flushedEntryRemoved(5)); + @Override + protected KeyValueStore createKeyValueStore( + ProcessorContext context, + Class keyClass, + Class valueClass, + boolean useContextSerdes) { + + StateStoreSupplier supplier; + if (useContextSerdes) { + Serializer keySer = (Serializer) context.keySerializer(); + Deserializer keyDeser = (Deserializer) context.keyDeserializer(); + Serializer valSer = (Serializer) context.valueSerializer(); + Deserializer valDeser = (Deserializer) context.valueDeserializer(); + supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().maxEntries(10).build(); + } else { + supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().maxEntries(10).build(); + } + + KeyValueStore store = (KeyValueStore) supplier.get(); + store.init(context); + return store; } @Test - public void testRestore() { + public void testEvict() { // Create the test driver ... KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - - // Add any entries that will be restored to any store - // that uses the driver's context ... - driver.addEntryToRestoreLog(1, "one"); - driver.addEntryToRestoreLog(2, "two"); - driver.addEntryToRestoreLog(4, "four"); - - // Create the store, which should register with the context and automatically - // receive the restore entries ... - StateStoreSupplier supplier = Stores.create("my-store") - .withIntegerKeys().withStringValues() - .inMemory().maxEntries(3) - .build(); - KeyValueStore store = (KeyValueStore) supplier.get(); - store.init(driver.context()); - - // Verify that the store's contents were properly restored ... - assertEquals(0, driver.checkForRestoredEntries(store)); - - // and there are no other entries ... - assertEquals(3, driver.sizeOf(store)); + KeyValueStore store = createKeyValueStore(driver.context(), Integer.class, String.class, false); + + try { + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(3, "three"); + store.put(4, "four"); + store.put(5, "five"); + store.put(6, "six"); + store.put(7, "seven"); + store.put(8, "eight"); + store.put(9, "nine"); + assertEquals(10, driver.sizeOf(store)); + + store.put(10, "ten"); + store.flush(); + assertEquals(10, driver.sizeOf(store)); + assertTrue(driver.flushedEntryRemoved(0)); + assertEquals(1, driver.numFlushedEntryRemoved()); + + store.delete(1); + store.flush(); + assertEquals(9, driver.sizeOf(store)); + assertTrue(driver.flushedEntryRemoved(0)); + assertTrue(driver.flushedEntryRemoved(1)); + assertEquals(2, driver.numFlushedEntryRemoved()); + + store.put(11, "eleven"); + store.flush(); + assertEquals(10, driver.sizeOf(store)); + assertEquals(2, driver.numFlushedEntryRemoved()); + + store.put(2, "two-again"); + store.flush(); + assertEquals(10, driver.sizeOf(store)); + assertEquals(2, driver.numFlushedEntryRemoved()); + + store.put(12, "twelve"); + store.flush(); + assertEquals(10, driver.sizeOf(store)); + assertTrue(driver.flushedEntryRemoved(0)); + assertTrue(driver.flushedEntryRemoved(1)); + assertTrue(driver.flushedEntryRemoved(2)); + assertEquals(3, driver.numFlushedEntryRemoved()); + } finally { + store.close(); + } } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java new file mode 100644 index 0000000..352e330 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class OffsetCheckpointTest { + + private final String topic = "topic"; + + private OffsetCheckpoint checkpoint = null; + + @Test + public void testReadWrite() throws IOException { + File f = new File("/tmp/kafka-streams/offset_checkpoint.test"); + checkpoint = new OffsetCheckpoint(f); + + try { + Map offsets = new HashMap<>(); + offsets.put(new TopicPartition(topic, 0), 0L); + offsets.put(new TopicPartition(topic, 1), 1L); + offsets.put(new TopicPartition(topic, 2), 2L); + + checkpoint.write(offsets); + assertEquals(offsets, checkpoint.read()); + + checkpoint.delete(); + assertFalse(f.exists()); + + offsets.put(new TopicPartition(topic, 3), 3L); + checkpoint.write(offsets); + assertEquals(offsets, checkpoint.read()); + } finally { + checkpoint.delete(); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index 29a3c0a..b9703db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -39,9 +39,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { Deserializer keyDeser = (Deserializer) context.keyDeserializer(); Serializer valSer = (Serializer) context.valueSerializer(); Deserializer valDeser = (Deserializer) context.valueDeserializer(); - supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build(); + supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).persistent().build(); } else { - supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build(); + supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).persistent().build(); } KeyValueStore store = (KeyValueStore) supplier.get(); http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 45448e5..94385c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.Serdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import org.apache.kafka.streams.state.WindowStoreUtil; +import org.apache.kafka.streams.state.WindowStoreUtils; import org.apache.kafka.test.MockProcessorContext; import org.junit.Test; @@ -58,8 +58,10 @@ public class RocksDBWindowStoreTest { private final long windowSize = 3; private final Serdes serdes = Serdes.withBuiltinTypes("", Integer.class, String.class); + @SuppressWarnings("unchecked") protected WindowStore createWindowStore(ProcessorContext context, Serdes serdes) { StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null); + WindowStore store = (WindowStore) supplier.get(); store.init(context); return store; @@ -659,8 +661,8 @@ public class RocksDBWindowStoreTest { HashMap> entriesByKey = new HashMap<>(); for (KeyValue entry : changeLog) { - long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key); - Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key, serdes); + long timestamp = WindowStoreUtils.timestampFromBinaryKey(entry.key); + Integer key = WindowStoreUtils.keyFromBinaryKey(entry.key, serdes); String value = entry.value == null ? null : serdes.valueFrom(entry.value); Set entries = entriesByKey.get(key); http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java new file mode 100644 index 0000000..5f014ef --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.state.internals; + + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.test.MockProcessorContext; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class StoreChangeLoggerTest { + + private final String topic = "topic"; + + private final Map logged = new HashMap<>(); + private final Map written = new HashMap<>(); + + private final ProcessorContext context = new MockProcessorContext(Serdes.withBuiltinTypes(topic, Integer.class, String.class), + new RecordCollector(null) { + @SuppressWarnings("unchecked") + @Override + public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer) { + logged.put((Integer) record.key(), (String) record.value()); + } + + @Override + public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer, + StreamPartitioner partitioner) { + // ignore partitioner + send(record, keySerializer, valueSerializer); + } + } + ); + + private final StoreChangeLogger changeLogger = new StoreChangeLogger<>(topic, context, Serdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3); + + private final StoreChangeLogger rawChangeLogger = new RawStoreChangeLogger(topic, context, 3, 3); + + private final StoreChangeLogger.ValueGetter getter = new StoreChangeLogger.ValueGetter() { + @Override + public String get(Integer key) { + return written.get(key); + } + }; + + private final StoreChangeLogger.ValueGetter rawGetter = new StoreChangeLogger.ValueGetter() { + private IntegerDeserializer deserializer = new IntegerDeserializer(); + private StringSerializer serializer = new StringSerializer(); + + @Override + public byte[] get(byte[] key) { + return serializer.serialize(topic, written.get(deserializer.deserialize(topic, key))); + } + }; + + @Test + public void testAddRemove() { + written.put(0, "zero"); + changeLogger.add(0); + written.put(1, "one"); + changeLogger.add(1); + written.put(2, "two"); + changeLogger.add(2); + assertEquals(3, changeLogger.numDirty()); + assertEquals(0, changeLogger.numRemoved()); + + changeLogger.delete(0); + changeLogger.delete(1); + written.put(3, "three"); + changeLogger.add(3); + assertEquals(2, changeLogger.numDirty()); + assertEquals(2, changeLogger.numRemoved()); + + written.put(0, "zero-again"); + changeLogger.add(0); + assertEquals(3, changeLogger.numDirty()); + assertEquals(1, changeLogger.numRemoved()); + + written.put(4, "four"); + changeLogger.add(4); + changeLogger.maybeLogChange(getter); + assertEquals(0, changeLogger.numDirty()); + assertEquals(0, changeLogger.numRemoved()); + assertEquals(5, logged.size()); + assertEquals("zero-again", logged.get(0)); + assertEquals(null, logged.get(1)); + assertEquals("two", logged.get(2)); + assertEquals("three", logged.get(3)); + assertEquals("four", logged.get(4)); + } + + @Test + public void testRaw() { + IntegerSerializer serializer = new IntegerSerializer(); + + written.put(0, "zero"); + rawChangeLogger.add(serializer.serialize(topic, 0)); + written.put(1, "one"); + rawChangeLogger.add(serializer.serialize(topic, 1)); + written.put(2, "two"); + rawChangeLogger.add(serializer.serialize(topic, 2)); + assertEquals(3, rawChangeLogger.numDirty()); + assertEquals(0, rawChangeLogger.numRemoved()); + + rawChangeLogger.delete(serializer.serialize(topic, 0)); + rawChangeLogger.delete(serializer.serialize(topic, 1)); + written.put(3, "three"); + rawChangeLogger.add(serializer.serialize(topic, 3)); + assertEquals(2, rawChangeLogger.numDirty()); + assertEquals(2, rawChangeLogger.numRemoved()); + + written.put(0, "zero-again"); + rawChangeLogger.add(serializer.serialize(topic, 0)); + assertEquals(3, rawChangeLogger.numDirty()); + assertEquals(1, rawChangeLogger.numRemoved()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index cb7a95c..31b8335 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.state.Serdes; import java.io.File; import java.util.Collections; @@ -49,6 +50,16 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S long timestamp = -1L; + public MockProcessorContext(Serdes serdes, RecordCollector collector) { + this(null, null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(), serdes.valueDeserializer(), collector); + } + + public MockProcessorContext(Serializer keySerializer, Deserializer keyDeserializer, + Serializer valueSerializer, Deserializer valueDeserializer, + RecordCollector collector) { + this(null, null, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, collector); + } + public MockProcessorContext(KStreamTestDriver driver, File stateDir, Serializer keySerializer, Deserializer keyDeserializer, Serializer valueSerializer, Deserializer valueDeserializer, http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java index 6810841..73d446f 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java @@ -49,9 +49,9 @@ public class MockStateStoreSupplier implements StateStoreSupplier { @Override public StateStore get() { if (loggingEnabled) { - return new MockStateStore(name, persistent); + return new MockStateStore(name, persistent).enableLogging(); } else { - return new MockStateStore(name, persistent).disableLogging(); + return new MockStateStore(name, persistent); } } @@ -59,7 +59,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier { private final String name; private final boolean persistent; - public boolean loggingEnabled = true; + public boolean loggingEnabled = false; public boolean initialized = false; public boolean flushed = false; public boolean closed = false; @@ -70,8 +70,8 @@ public class MockStateStoreSupplier implements StateStoreSupplier { this.persistent = persistent; } - public MockStateStore disableLogging() { - loggingEnabled = false; + public MockStateStore enableLogging() { + loggingEnabled = true; return this; }