kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7158: Add unit test for window store range queries (#5466)
Date Wed, 08 Aug 2018 21:02:38 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 27224a3  KAFKA-7158: Add unit test for window store range queries (#5466)
27224a3 is described below

commit 27224a38d923cb28f650710e0af4a35d785e3025
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Wed Aug 8 13:53:34 2018 -0700

    KAFKA-7158: Add unit test for window store range queries (#5466)
    
    While debugging the reported issue, I found that our current unit test lacks coverage
to actually expose the underlying root cause.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
    
    minor fix
---
 .../state/internals/CachingWindowStoreTest.java    | 104 +++++++++++++++++++++
 1 file changed, 104 insertions(+)

diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index b8808ca..551aeb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -16,20 +16,31 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -38,6 +49,8 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
 
 import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
 import static org.apache.kafka.test.StreamsTestUtils.toList;
@@ -91,6 +104,97 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    public void shouldNotReturnDuplicatesInRanges() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.windowStoreBuilder(
+            Stores.persistentWindowStore("store-name", 3600000L, 3, 60000L, false),
+            Serdes.String(),
+            Serdes.String())
+            .withCachingEnabled();
+
+        builder.addStateStore(storeBuilder);
+
+        builder.stream(topic,
+            Consumed.with(Serdes.String(), Serdes.String()))
+            .transform(() -> new Transformer<String, String, KeyValue<String, String>>()
{
+                private WindowStore<String, String> store;
+                private int numRecordsProcessed;
+
+                @Override
+                public void init(final ProcessorContext processorContext) {
+                    this.store = (WindowStore<String, String>) processorContext.getStateStore("store-name");
+                    int count = 0;
+
+                    final KeyValueIterator<Windowed<String>, String> all = store.all();
+                    while (all.hasNext()) {
+                        count++;
+                        all.next();
+                    }
+
+                    assertThat(count, equalTo(0));
+                }
+
+                @Override
+                public KeyValue<String, String> transform(final String key, final String
value) {
+                    int count = 0;
+
+                    final KeyValueIterator<Windowed<String>, String> all = store.all();
+                    while (all.hasNext()) {
+                        count++;
+                        all.next();
+                    }
+                    assertThat(count, equalTo(numRecordsProcessed));
+
+                    store.put(value, value);
+
+                    numRecordsProcessed++;
+
+                    return new KeyValue<>(key, value);
+                }
+
+                @Override
+                public void close() {
+
+                }
+            }, "store-name");
+
+        final String bootstrapServers = "localhost:9092";
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
+
+        final long initialWallClockTime = 0L;
+        final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), streamsConfiguration,
initialWallClockTime);
+
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(Serdes.String().serializer(),
Serdes.String().serializer(), initialWallClockTime);
+
+        for (int i = 0; i < 5; i++) {
+            driver.pipeInput(recordFactory.create(topic, UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+        }
+        driver.advanceWallClockTime(10 * 1000L);
+        recordFactory.advanceTimeMs(10 * 1000L);
+        for (int i = 0; i < 5; i++) {
+            driver.pipeInput(recordFactory.create(topic, UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+        }
+        driver.advanceWallClockTime(10 * 1000L);
+        recordFactory.advanceTimeMs(10 * 1000L);
+        for (int i = 0; i < 5; i++) {
+            driver.pipeInput(recordFactory.create(topic, UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+        }
+        driver.advanceWallClockTime(10 * 1000L);
+        recordFactory.advanceTimeMs(10 * 1000L);
+        for (int i = 0; i < 5; i++) {
+            driver.pipeInput(recordFactory.create(topic, UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+        }
+    }
+
+    @Test
     public void shouldPutFetchFromCache() {
         cachingStore.put(bytesKey("a"), bytesValue("a"));
         cachingStore.put(bytesKey("b"), bytesValue("b"));


Mime
View raw message