Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 07D5A186DA for ; Thu, 21 Jan 2016 00:10:48 +0000 (UTC) Received: (qmail 17772 invoked by uid 500); 21 Jan 2016 00:10:47 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 17733 invoked by uid 500); 21 Jan 2016 00:10:47 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 17568 invoked by uid 99); 21 Jan 2016 00:10:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jan 2016 00:10:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A24B9DFF94; Thu, 21 Jan 2016 00:10:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Date: Thu, 21 Jan 2016 00:10:48 -0000 Message-Id: <6966e0f206654a038452ae676fe99a39@git.apache.org> In-Reply-To: <2ce1e6fcacd74ee29815a594fa388722@git.apache.org> References: <2ce1e6fcacd74ee29815a594fa388722@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add Reduce functions http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java deleted file mode 100644 index fc7a4e9..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java +++ /dev/null @@ -1,671 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.clients.producer.MockProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.test.MockProcessorContext; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -public class RocksDBWindowStoreTest { - - private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); - private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer(); - private final int numSegments = 3; - private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL; - private final long retentionPeriod = segmentSize * (numSegments - 1); - private final long windowSize = 3; - private final Serdes serdes = Serdes.withBuiltinTypes("", Integer.class, String.class); - - protected WindowStore createWindowStore(ProcessorContext context, Serdes serdes) { - StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null); - WindowStore store = (WindowStore) supplier.get(); - store.init(context); - return store; - } - - @Test - public void testPutAndFetch() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); - try { - final List> changeLog = new ArrayList<>(); - Producer producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); - RecordCollector recordCollector = new RecordCollector(producer) { - @Override - public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer) { - changeLog.add(new Entry<>( - keySerializer.serialize(record.topic(), record.key()), - valueSerializer.serialize(record.topic(), record.value())) - ); - } - }; - - MockProcessorContext context = new MockProcessorContext( - null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, - recordCollector); - - WindowStore store = createWindowStore(context, serdes); - try { - long startTime = segmentSize - 4L; - - context.setTime(startTime + 0L); - store.put(0, "zero"); - context.setTime(startTime + 1L); - store.put(1, "one"); - context.setTime(startTime + 2L); - store.put(2, "two"); - context.setTime(startTime + 3L); - // (3, "three") is not put - context.setTime(startTime + 4L); - store.put(4, "four"); - context.setTime(startTime + 5L); - store.put(5, "five"); - - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize))); - - context.setTime(startTime + 3L); - store.put(2, "two+1"); - context.setTime(startTime + 4L); - store.put(2, "two+2"); - context.setTime(startTime + 5L); - store.put(2, "two+3"); - context.setTime(startTime + 6L); - store.put(2, "two+4"); - context.setTime(startTime + 7L); - store.put(2, "two+5"); - context.setTime(startTime + 8L); - store.put(2, "two+6"); - - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize))); - assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize))); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - - Map> entriesByKey = entriesByKey(changeLog, startTime); - - assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); - assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); - assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); - assertNull(entriesByKey.get(3)); - assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); - assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); - assertNull(entriesByKey.get(6)); - - } finally { - store.close(); - } - - } finally { - Utils.delete(baseDir); - } - } - - @Test - public void testPutAndFetchBefore() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); - try { - final List> changeLog = new ArrayList<>(); - Producer producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); - RecordCollector recordCollector = new RecordCollector(producer) { - @Override - public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer) { - changeLog.add(new Entry<>( - keySerializer.serialize(record.topic(), record.key()), - valueSerializer.serialize(record.topic(), record.value())) - ); - } - }; - - MockProcessorContext context = new MockProcessorContext( - null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, - recordCollector); - - WindowStore store = createWindowStore(context, serdes); - try { - long startTime = segmentSize - 4L; - - context.setTime(startTime + 0L); - store.put(0, "zero"); - context.setTime(startTime + 1L); - store.put(1, "one"); - context.setTime(startTime + 2L); - store.put(2, "two"); - context.setTime(startTime + 3L); - // (3, "three") is not put - context.setTime(startTime + 4L); - store.put(4, "four"); - context.setTime(startTime + 5L); - store.put(5, "five"); - - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L))); - - context.setTime(startTime + 3L); - store.put(2, "two+1"); - context.setTime(startTime + 4L); - store.put(2, "two+2"); - context.setTime(startTime + 5L); - store.put(2, "two+3"); - context.setTime(startTime + 6L); - store.put(2, "two+4"); - context.setTime(startTime + 7L); - store.put(2, "two+5"); - context.setTime(startTime + 8L); - store.put(2, "two+6"); - - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L))); - assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L))); - assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L))); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - - Map> entriesByKey = entriesByKey(changeLog, startTime); - - assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); - assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); - assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); - assertNull(entriesByKey.get(3)); - assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); - assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); - assertNull(entriesByKey.get(6)); - - } finally { - store.close(); - } - - } finally { - Utils.delete(baseDir); - } - } - - @Test - public void testPutAndFetchAfter() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); - try { - final List> changeLog = new ArrayList<>(); - Producer producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); - RecordCollector recordCollector = new RecordCollector(producer) { - @Override - public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer) { - changeLog.add(new Entry<>( - keySerializer.serialize(record.topic(), record.key()), - valueSerializer.serialize(record.topic(), record.value())) - ); - } - }; - - MockProcessorContext context = new MockProcessorContext( - null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, - recordCollector); - - WindowStore store = createWindowStore(context, serdes); - try { - long startTime = segmentSize - 4L; - - context.setTime(startTime + 0L); - store.put(0, "zero"); - context.setTime(startTime + 1L); - store.put(1, "one"); - context.setTime(startTime + 2L); - store.put(2, "two"); - context.setTime(startTime + 3L); - // (3, "three") is not put - context.setTime(startTime + 4L); - store.put(4, "four"); - context.setTime(startTime + 5L); - store.put(5, "five"); - - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize))); - - context.setTime(startTime + 3L); - store.put(2, "two+1"); - context.setTime(startTime + 4L); - store.put(2, "two+2"); - context.setTime(startTime + 5L); - store.put(2, "two+3"); - context.setTime(startTime + 6L); - store.put(2, "two+4"); - context.setTime(startTime + 7L); - store.put(2, "two+5"); - context.setTime(startTime + 8L); - store.put(2, "two+6"); - - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize))); - assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize))); - assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize))); - assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize))); - assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize))); - assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize))); - assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize))); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - - Map> entriesByKey = entriesByKey(changeLog, startTime); - - assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); - assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); - assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); - assertNull(entriesByKey.get(3)); - assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); - assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); - assertNull(entriesByKey.get(6)); - - } finally { - store.close(); - } - - } finally { - Utils.delete(baseDir); - } - } - - @Test - public void testPutSameKeyTimestamp() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); - try { - final List> changeLog = new ArrayList<>(); - Producer producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); - RecordCollector recordCollector = new RecordCollector(producer) { - @Override - public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer) { - changeLog.add(new Entry<>( - keySerializer.serialize(record.topic(), record.key()), - valueSerializer.serialize(record.topic(), record.value())) - ); - } - }; - - MockProcessorContext context = new MockProcessorContext( - null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, - recordCollector); - - WindowStore store = createWindowStore(context, serdes); - try { - long startTime = segmentSize - 4L; - - context.setTime(startTime); - store.put(0, "zero"); - - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - - context.setTime(startTime); - store.put(0, "zero"); - context.setTime(startTime); - store.put(0, "zero+"); - context.setTime(startTime); - store.put(0, "zero++"); - - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize))); - assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize))); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - - Map> entriesByKey = entriesByKey(changeLog, startTime); - - assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0)); - - } finally { - store.close(); - } - - } finally { - Utils.delete(baseDir); - } - } - - @Test - public void testRolling() throws IOException { - File baseDir = Files.createTempDirectory("test").toFile(); - try { - final List> changeLog = new ArrayList<>(); - Producer producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); - RecordCollector recordCollector = new RecordCollector(producer) { - @Override - public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer) { - changeLog.add(new Entry<>( - keySerializer.serialize(record.topic(), record.key()), - valueSerializer.serialize(record.topic(), record.value())) - ); - } - }; - - MockProcessorContext context = new MockProcessorContext( - null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, - recordCollector); - - WindowStore store = createWindowStore(context, serdes); - RocksDBWindowStore inner = - (RocksDBWindowStore) ((MeteredWindowStore) store).inner(); - try { - long startTime = segmentSize * 2; - long incr = segmentSize / 2; - - context.setTime(startTime); - store.put(0, "zero"); - assertEquals(Utils.mkSet(2L), inner.segmentIds()); - - context.setTime(startTime + incr); - store.put(1, "one"); - assertEquals(Utils.mkSet(2L), inner.segmentIds()); - - context.setTime(startTime + incr * 2); - store.put(2, "two"); - assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds()); - - context.setTime(startTime + incr * 3); - // (3, "three") is not put - assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds()); - - context.setTime(startTime + incr * 4); - store.put(4, "four"); - assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds()); - - context.setTime(startTime + incr * 5); - store.put(5, "five"); - assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds()); - - assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - - context.setTime(startTime + incr * 6); - store.put(6, "six"); - assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds()); - - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - - - context.setTime(startTime + incr * 7); - store.put(7, "seven"); - assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds()); - - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); - - context.setTime(startTime + incr * 8); - store.put(8, "eight"); - assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds()); - - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); - assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); - - // check segment directories - store.flush(); - assertEquals( - Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)), - segmentDirs(baseDir) - ); - } finally { - store.close(); - } - - } finally { - Utils.delete(baseDir); - } - } - - @Test - public void testRestore() throws IOException { - final List> changeLog = new ArrayList<>(); - long startTime = segmentSize * 2; - long incr = segmentSize / 2; - - File baseDir = Files.createTempDirectory("test").toFile(); - try { - Producer producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); - RecordCollector recordCollector = new RecordCollector(producer) { - @Override - public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer) { - changeLog.add(new Entry<>( - keySerializer.serialize(record.topic(), record.key()), - valueSerializer.serialize(record.topic(), record.value())) - ); - } - }; - - MockProcessorContext context = new MockProcessorContext( - null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, - recordCollector); - - WindowStore store = createWindowStore(context, serdes); - try { - context.setTime(startTime); - store.put(0, "zero"); - context.setTime(startTime + incr); - store.put(1, "one"); - context.setTime(startTime + incr * 2); - store.put(2, "two"); - context.setTime(startTime + incr * 3); - store.put(3, "three"); - context.setTime(startTime + incr * 4); - store.put(4, "four"); - context.setTime(startTime + incr * 5); - store.put(5, "five"); - context.setTime(startTime + incr * 6); - store.put(6, "six"); - context.setTime(startTime + incr * 7); - store.put(7, "seven"); - context.setTime(startTime + incr * 8); - store.put(8, "eight"); - store.flush(); - - } finally { - store.close(); - } - - - } finally { - Utils.delete(baseDir); - } - - File baseDir2 = Files.createTempDirectory("test").toFile(); - try { - Producer producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); - RecordCollector recordCollector = new RecordCollector(producer) { - @Override - public void send(ProducerRecord record, Serializer keySerializer, Serializer valueSerializer) { - changeLog.add(new Entry<>( - keySerializer.serialize(record.topic(), record.key()), - valueSerializer.serialize(record.topic(), record.value())) - ); - } - }; - - MockProcessorContext context = new MockProcessorContext( - null, baseDir, - byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, - recordCollector); - - WindowStore store = createWindowStore(context, serdes); - RocksDBWindowStore inner = - (RocksDBWindowStore) ((MeteredWindowStore) store).inner(); - - try { - context.restore("window", changeLog); - - assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds()); - - assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize))); - assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize))); - assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize))); - assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize))); - assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize))); - assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize))); - assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize))); - - // check segment directories - store.flush(); - assertEquals( - Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)), - segmentDirs(baseDir) - ); - } finally { - store.close(); - } - - - } finally { - Utils.delete(baseDir2); - } - } - - private List toList(WindowStoreIterator iterator) { - ArrayList list = new ArrayList<>(); - while (iterator.hasNext()) { - list.add(iterator.next().value); - } - return list; - } - - private Set segmentDirs(File baseDir) { - File rocksDbDir = new File(baseDir, "rocksdb"); - String[] subdirs = rocksDbDir.list(); - - HashSet set = new HashSet<>(); - - for (String subdir : subdirs) { - if (subdir.startsWith("window-")) - set.add(subdir.substring(7)); - } - return set; - } - - private Map> entriesByKey(List> changeLog, long startTime) { - HashMap> entriesByKey = new HashMap<>(); - - for (Entry entry : changeLog) { - long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key()); - Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes); - String value = entry.value() == null ? null : serdes.valueFrom(entry.value()); - - Set entries = entriesByKey.get(key); - if (entries == null) { - entries = new HashSet<>(); - entriesByKey.put(key, entries); - } - entries.add(value + "@" + (timestamp - startTime)); - } - - return entriesByKey; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java new file mode 100644 index 0000000..2ed698c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.Entry; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.KeyValueStoreTestDriver; +import org.junit.Test; + +public abstract class AbstractKeyValueStoreTest { + + protected abstract KeyValueStore createKeyValueStore(ProcessorContext context, + Class keyClass, Class valueClass, + boolean useContextSerdes); + + @Test + public void testPutGetRange() { + // Create the test driver ... + KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(); + KeyValueStore store = createKeyValueStore(driver.context(), Integer.class, String.class, false); + try { + + // Verify that the store reads and writes correctly ... + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(4, "four"); + store.put(5, "five"); + assertEquals(5, driver.sizeOf(store)); + assertEquals("zero", store.get(0)); + assertEquals("one", store.get(1)); + assertEquals("two", store.get(2)); + assertNull(store.get(3)); + assertEquals("four", store.get(4)); + assertEquals("five", store.get(5)); + store.delete(5); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + assertEquals("zero", driver.flushedEntryStored(0)); + assertEquals("one", driver.flushedEntryStored(1)); + assertEquals("two", driver.flushedEntryStored(2)); + assertEquals("four", driver.flushedEntryStored(4)); + assertEquals(null, driver.flushedEntryStored(5)); + + assertEquals(false, driver.flushedEntryRemoved(0)); + assertEquals(false, driver.flushedEntryRemoved(1)); + assertEquals(false, driver.flushedEntryRemoved(2)); + assertEquals(false, driver.flushedEntryRemoved(4)); + assertEquals(true, driver.flushedEntryRemoved(5)); + + // Check range iteration ... + try (KeyValueIterator iter = store.range(2, 4)) { + while (iter.hasNext()) { + Entry entry = iter.next(); + if (entry.key().equals(2)) + assertEquals("two", entry.value()); + else if (entry.key().equals(4)) + assertEquals("four", entry.value()); + else + fail("Unexpected entry: " + entry); + } + } + + // Check range iteration ... + try (KeyValueIterator iter = store.range(2, 6)) { + while (iter.hasNext()) { + Entry entry = iter.next(); + if (entry.key().equals(2)) + assertEquals("two", entry.value()); + else if (entry.key().equals(4)) + assertEquals("four", entry.value()); + else + fail("Unexpected entry: " + entry); + } + } + } finally { + store.close(); + } + } + + @Test + public void testPutGetRangeWithDefaultSerdes() { + // Create the test driver ... + KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(Integer.class, String.class); + KeyValueStore store = createKeyValueStore(driver.context(), Integer.class, String.class, true); + try { + + // Verify that the store reads and writes correctly ... + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(4, "four"); + store.put(5, "five"); + assertEquals(5, driver.sizeOf(store)); + assertEquals("zero", store.get(0)); + assertEquals("one", store.get(1)); + assertEquals("two", store.get(2)); + assertNull(store.get(3)); + assertEquals("four", store.get(4)); + assertEquals("five", store.get(5)); + store.delete(5); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + assertEquals("zero", driver.flushedEntryStored(0)); + assertEquals("one", driver.flushedEntryStored(1)); + assertEquals("two", driver.flushedEntryStored(2)); + assertEquals("four", driver.flushedEntryStored(4)); + assertEquals(null, driver.flushedEntryStored(5)); + + assertEquals(false, driver.flushedEntryRemoved(0)); + assertEquals(false, driver.flushedEntryRemoved(1)); + assertEquals(false, driver.flushedEntryRemoved(2)); + assertEquals(false, driver.flushedEntryRemoved(4)); + assertEquals(true, driver.flushedEntryRemoved(5)); + } finally { + store.close(); + } + } + + @Test + public void testRestore() { + // Create the test driver ... + KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(Integer.class, String.class); + + // Add any entries that will be restored to any store + // that uses the driver's context ... + driver.addEntryToRestoreLog(0, "zero"); + driver.addEntryToRestoreLog(1, "one"); + driver.addEntryToRestoreLog(2, "two"); + driver.addEntryToRestoreLog(4, "four"); + + // Create the store, which should register with the context and automatically + // receive the restore entries ... + KeyValueStore store = createKeyValueStore(driver.context(), Integer.class, String.class, false); + try { + // Verify that the store's contents were properly restored ... + assertEquals(0, driver.checkForRestoredEntries(store)); + + // and there are no other entries ... + assertEquals(4, driver.sizeOf(store)); + } finally { + store.close(); + } + } + + @Test + public void testRestoreWithDefaultSerdes() { + // Create the test driver ... + KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(Integer.class, String.class); + + // Add any entries that will be restored to any store + // that uses the driver's context ... + driver.addEntryToRestoreLog(0, "zero"); + driver.addEntryToRestoreLog(1, "one"); + driver.addEntryToRestoreLog(2, "two"); + driver.addEntryToRestoreLog(4, "four"); + + // Create the store, which should register with the context and automatically + // receive the restore entries ... + KeyValueStore store = createKeyValueStore(driver.context(), Integer.class, String.class, true); + try { + // Verify that the store's contents were properly restored ... + assertEquals(0, driver.checkForRestoredEntries(store)); + + // and there are no other entries ... + assertEquals(4, driver.sizeOf(store)); + } finally { + store.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java new file mode 100644 index 0000000..2b0927e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { + + @SuppressWarnings("unchecked") + @Override + protected KeyValueStore createKeyValueStore( + ProcessorContext context, + Class keyClass, Class valueClass, + boolean useContextSerdes) { + + StateStoreSupplier supplier; + if (useContextSerdes) { + Serializer keySer = (Serializer) context.keySerializer(); + Deserializer keyDeser = (Deserializer) context.keyDeserializer(); + Serializer valSer = (Serializer) context.valueSerializer(); + Deserializer valDeser = (Deserializer) context.valueDeserializer(); + supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build(); + } else { + supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build(); + } + + KeyValueStore store = (KeyValueStore) supplier.get(); + store.init(context); + return store; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java new file mode 100644 index 0000000..d7cc5b9 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.KeyValueStoreTestDriver; +import org.apache.kafka.streams.state.Stores; +import org.junit.Test; + +public class InMemoryLRUCacheStoreTest { + + @SuppressWarnings("unchecked") + @Test + public void testPutGetRange() { + // Create the test driver ... + KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(); + StateStoreSupplier supplier = Stores.create("my-store") + .withIntegerKeys().withStringValues() + .inMemory().maxEntries(3) + .build(); + KeyValueStore store = (KeyValueStore) supplier.get(); + store.init(driver.context()); + + // Verify that the store reads and writes correctly, keeping only the last 2 entries ... + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(3, "three"); + store.put(4, "four"); + store.put(5, "five"); + + // It should only keep the last 4 added ... + assertEquals(3, driver.sizeOf(store)); + assertNull(store.get(0)); + assertNull(store.get(1)); + assertNull(store.get(2)); + assertEquals("three", store.get(3)); + assertEquals("four", store.get(4)); + assertEquals("five", store.get(5)); + store.delete(5); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + assertNull(driver.flushedEntryStored(0)); + assertNull(driver.flushedEntryStored(1)); + assertNull(driver.flushedEntryStored(2)); + assertEquals("three", driver.flushedEntryStored(3)); + assertEquals("four", driver.flushedEntryStored(4)); + assertNull(driver.flushedEntryStored(5)); + + assertEquals(true, driver.flushedEntryRemoved(0)); + assertEquals(true, driver.flushedEntryRemoved(1)); + assertEquals(true, driver.flushedEntryRemoved(2)); + assertEquals(false, driver.flushedEntryRemoved(3)); + assertEquals(false, driver.flushedEntryRemoved(4)); + assertEquals(true, driver.flushedEntryRemoved(5)); + } + + @SuppressWarnings("unchecked") + @Test + public void testPutGetRangeWithDefaultSerdes() { + // Create the test driver ... + KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(); + + Serializer keySer = (Serializer) driver.context().keySerializer(); + Deserializer keyDeser = (Deserializer) driver.context().keyDeserializer(); + Serializer valSer = (Serializer) driver.context().valueSerializer(); + Deserializer valDeser = (Deserializer) driver.context().valueDeserializer(); + StateStoreSupplier supplier = Stores.create("my-store") + .withKeys(keySer, keyDeser) + .withValues(valSer, valDeser) + .inMemory().maxEntries(3) + .build(); + KeyValueStore store = (KeyValueStore) supplier.get(); + store.init(driver.context()); + + // Verify that the store reads and writes correctly, keeping only the last 2 entries ... + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(3, "three"); + store.put(4, "four"); + store.put(5, "five"); + + // It should only keep the last 4 added ... + assertEquals(3, driver.sizeOf(store)); + assertNull(store.get(0)); + assertNull(store.get(1)); + assertNull(store.get(2)); + assertEquals("three", store.get(3)); + assertEquals("four", store.get(4)); + assertEquals("five", store.get(5)); + store.delete(5); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + assertNull(driver.flushedEntryStored(0)); + assertNull(driver.flushedEntryStored(1)); + assertNull(driver.flushedEntryStored(2)); + assertEquals("three", driver.flushedEntryStored(3)); + assertEquals("four", driver.flushedEntryStored(4)); + assertNull(driver.flushedEntryStored(5)); + + assertEquals(true, driver.flushedEntryRemoved(0)); + assertEquals(true, driver.flushedEntryRemoved(1)); + assertEquals(true, driver.flushedEntryRemoved(2)); + assertEquals(false, driver.flushedEntryRemoved(3)); + assertEquals(false, driver.flushedEntryRemoved(4)); + assertEquals(true, driver.flushedEntryRemoved(5)); + } + + @Test + public void testRestore() { + // Create the test driver ... + KeyValueStoreTestDriver driver = KeyValueStoreTestDriver.create(Integer.class, String.class); + + // Add any entries that will be restored to any store + // that uses the driver's context ... + driver.addEntryToRestoreLog(1, "one"); + driver.addEntryToRestoreLog(2, "two"); + driver.addEntryToRestoreLog(4, "four"); + + // Create the store, which should register with the context and automatically + // receive the restore entries ... + StateStoreSupplier supplier = Stores.create("my-store") + .withIntegerKeys().withStringValues() + .inMemory().maxEntries(3) + .build(); + KeyValueStore store = (KeyValueStore) supplier.get(); + store.init(driver.context()); + + // Verify that the store's contents were properly restored ... + assertEquals(0, driver.checkForRestoredEntries(store)); + + // and there are no other entries ... + assertEquals(3, driver.sizeOf(store)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java new file mode 100644 index 0000000..29a3c0a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { + + @SuppressWarnings("unchecked") + @Override + protected KeyValueStore createKeyValueStore( + ProcessorContext context, + Class keyClass, + Class valueClass, + boolean useContextSerdes) { + + StateStoreSupplier supplier; + if (useContextSerdes) { + Serializer keySer = (Serializer) context.keySerializer(); + Deserializer keyDeser = (Deserializer) context.keyDeserializer(); + Serializer valSer = (Serializer) context.valueSerializer(); + Deserializer valDeser = (Deserializer) context.valueDeserializer(); + supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build(); + } else { + supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build(); + } + + KeyValueStore store = (KeyValueStore) supplier.get(); + store.init(context); + return store; + + } +}