From commits-return-10975-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Jan 9 15:05:06 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 06A32180676 for ; Wed, 9 Jan 2019 15:05:03 +0100 (CET) Received: (qmail 72954 invoked by uid 500); 9 Jan 2019 14:05:03 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 72941 invoked by uid 99); 9 Jan 2019 14:05:03 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Jan 2019 14:05:03 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 3F3B085BFB; Wed, 9 Jan 2019 14:05:02 +0000 (UTC) Date: Wed, 09 Jan 2019 14:05:01 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: MINOR: code cleanup (#6057) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154704270061.29088.12080798888729171618@gitbox.apache.org> From: mjsax@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 6e7149b77a10ac6aa4da2edd549f468015170236 X-Git-Newrev: 86de2dfd27f96dd3fea0635ad110c63bc639c721 X-Git-Rev: 86de2dfd27f96dd3fea0635ad110c63bc639c721 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 86de2df MINOR: code cleanup (#6057) 86de2df is described below commit 86de2dfd27f96dd3fea0635ad110c63bc639c721 Author: Matthias J. Sax AuthorDate: Wed Jan 9 15:04:52 2019 +0100 MINOR: code cleanup (#6057) Reviewers: Bill Bejeck , John Roesler , Guozhang Wang --- .../streams/integration/EosIntegrationTest.java | 267 +++++++++------------ .../GlobalKTableEOSIntegrationTest.java | 126 ++++------ .../integration/GlobalKTableIntegrationTest.java | 102 +++----- .../integration/GlobalThreadShutDownOrderTest.java | 68 ++---- .../KTableSourceTopicRestartIntegrationTest.java | 31 +-- .../PurgeRepartitionTopicIntegrationTest.java | 71 +++--- .../integration/QueryableStateIntegrationTest.java | 254 +++++++++++++------- 7 files changed, 442 insertions(+), 477 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index bdfbb3b..505d454 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -40,7 +41,6 @@ import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.ClassRule; @@ -49,6 +49,7 @@ import org.junit.experimental.categories.Category; import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -98,7 +99,7 @@ public class EosIntegrationTest { private int testNumber = 0; @Before - public void createTopics() throws InterruptedException { + public void createTopics() throws Exception { applicationId = "appId-" + ++testNumber; CLUSTER.deleteTopicsAndWait( SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC, @@ -154,26 +155,24 @@ public class EosIntegrationTest { output.to(outputTopic); for (int i = 0; i < numberOfRestarts; ++i) { - final KafkaStreams streams = new KafkaStreams( - builder.build(), - StreamsTestUtils.getStreamsConfig( - applicationId, - CLUSTER.bootstrapServers(), - Serdes.LongSerde.class.getName(), - Serdes.LongSerde.class.getName(), - new Properties() { - { - put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); - put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); - put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); - put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - } - })); + final Properties config = StreamsTestUtils.getStreamsConfig( + applicationId, + CLUSTER.bootstrapServers(), + Serdes.LongSerde.class.getName(), + Serdes.LongSerde.class.getName(), + new Properties() { + { + put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); + put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); + put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); + put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + } + }); - try { + try (final KafkaStreams streams = new KafkaStreams(builder.build(), config)) { streams.start(); final List> inputData = prepareData(i * 100, i * 100 + 10L, 0L, 1L); @@ -192,18 +191,15 @@ public class EosIntegrationTest { CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, - new Properties() { - { - put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); - } - }), + Utils.mkProperties(Collections.singletonMap( + ConsumerConfig.ISOLATION_LEVEL_CONFIG, + IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT))) + ), outputTopic, inputData.size() ); checkResultPerKey(committedRecords, inputData); - } finally { - streams.close(); } } } @@ -220,13 +216,15 @@ public class EosIntegrationTest { } - private void addAllKeys(final Set allKeys, final List> records) { + private void addAllKeys(final Set allKeys, + final List> records) { for (final KeyValue record : records) { allKeys.add(record.key); } } - private List> getAllRecordPerKey(final Long key, final List> records) { + private List> getAllRecordPerKey(final Long key, + final List> records) { final List> recordsPerKey = new ArrayList<>(records.size()); for (final KeyValue record : records) { @@ -243,24 +241,21 @@ public class EosIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC); - final KafkaStreams streams = new KafkaStreams( - builder.build(), - StreamsTestUtils.getStreamsConfig( - applicationId, - CLUSTER.bootstrapServers(), - Serdes.LongSerde.class.getName(), - Serdes.LongSerde.class.getName(), - new Properties() { - { - put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); - put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - } - })); - - try { + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final Properties config = StreamsTestUtils.getStreamsConfig( + applicationId, + CLUSTER.bootstrapServers(), + Serdes.LongSerde.class.getName(), + Serdes.LongSerde.class.getName(), + properties); + + try (final KafkaStreams streams = new KafkaStreams(builder.build(), config)) { streams.start(); final List> firstBurstOfData = prepareData(0L, 5L, 0L); @@ -315,8 +310,6 @@ public class EosIntegrationTest { ); assertThat(secondCommittedRecords, equalTo(secondBurstOfData)); - } finally { - streams.close(); } } @@ -330,8 +323,7 @@ public class EosIntegrationTest { // -> the failure only kills one thread // after fail over, we should read 40 committed records (even if 50 record got written) - final KafkaStreams streams = getKafkaStreams(false, "appDir", 2); - try { + try (final KafkaStreams streams = getKafkaStreams(false, "appDir", 2)) { streams.start(); final List> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L); @@ -345,12 +337,9 @@ public class EosIntegrationTest { writeInputData(committedDataBeforeFailure); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return commitRequested.get() == 2; - } - }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit."); + TestUtils.waitForCondition( + () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS, + "SteamsTasks did not request commit."); writeInputData(uncommittedDataBeforeFailure); @@ -363,12 +352,9 @@ public class EosIntegrationTest { errorInjected.set(true); writeInputData(dataAfterFailure); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return uncaughtException != null; - } - }, MAX_WAIT_TIME_MS, "Should receive uncaught exception from one StreamThread."); + TestUtils.waitForCondition( + () -> uncaughtException != null, MAX_WAIT_TIME_MS, + "Should receive uncaught exception from one StreamThread."); final List> allCommittedRecords = readResult( committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), @@ -389,8 +375,6 @@ public class EosIntegrationTest { checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery); checkResultPerKey(committedRecordsAfterFailure, expectedCommittedRecordsAfterRecovery); - } finally { - streams.close(); } } @@ -407,8 +391,7 @@ public class EosIntegrationTest { // after fail over, we should read 40 committed records and the state stores should contain the correct sums // per key (even if some records got processed twice) - final KafkaStreams streams = getKafkaStreams(true, "appDir", 2); - try { + try (final KafkaStreams streams = getKafkaStreams(true, "appDir", 2)) { streams.start(); final List> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L); @@ -422,12 +405,9 @@ public class EosIntegrationTest { writeInputData(committedDataBeforeFailure); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return commitRequested.get() == 2; - } - }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit."); + TestUtils.waitForCondition( + () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS, + "SteamsTasks did not request commit."); writeInputData(uncommittedDataBeforeFailure); @@ -442,12 +422,9 @@ public class EosIntegrationTest { errorInjected.set(true); writeInputData(dataAfterFailure); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return uncaughtException != null; - } - }, MAX_WAIT_TIME_MS, "Should receive uncaught exception from one StreamThread."); + TestUtils.waitForCondition( + () -> uncaughtException != null, MAX_WAIT_TIME_MS, + "Should receive uncaught exception from one StreamThread."); final List> allCommittedRecords = readResult( committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), @@ -465,11 +442,11 @@ public class EosIntegrationTest { final List> expectedResult = computeExpectedResult(allExpectedCommittedRecordsAfterRecovery); checkResultPerKey(allCommittedRecords, expectedResult); - checkResultPerKey(committedRecordsAfterFailure, expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size())); + checkResultPerKey( + committedRecordsAfterFailure, + expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size())); verifyStateStore(streams, getMaxPerKey(expectedResult)); - } finally { - streams.close(); } } @@ -486,9 +463,10 @@ public class EosIntegrationTest { // afterwards, the "stalling" thread resumes, and another rebalance should get triggered // we write the remaining 20 records and verify to read 60 result records - final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1); - final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1); - try { + try ( + final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1); + final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1) + ) { streams1.start(); streams2.start(); @@ -505,12 +483,9 @@ public class EosIntegrationTest { writeInputData(committedDataBeforeGC); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return commitRequested.get() == 2; - } - }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit."); + TestUtils.waitForCondition( + () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS, + "SteamsTasks did not request commit."); writeInputData(uncommittedDataBeforeGC); @@ -523,14 +498,12 @@ public class EosIntegrationTest { gcInjected.set(true); writeInputData(dataToTriggerFirstRebalance); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return streams1.allMetadata().size() == 1 && streams2.allMetadata().size() == 1 && - (streams1.allMetadata().iterator().next().topicPartitions().size() == 2 - || streams2.allMetadata().iterator().next().topicPartitions().size() == 2); - } - }, MAX_WAIT_TIME_MS, "Should have rebalanced."); + TestUtils.waitForCondition( + () -> streams1.allMetadata().size() == 1 + && streams2.allMetadata().size() == 1 + && (streams1.allMetadata().iterator().next().topicPartitions().size() == 2 + || streams2.allMetadata().iterator().next().topicPartitions().size() == 2), + MAX_WAIT_TIME_MS, "Should have rebalanced."); final List> committedRecordsAfterRebalance = readResult( uncommittedDataBeforeGC.size() + dataToTriggerFirstRebalance.size(), @@ -543,14 +516,13 @@ public class EosIntegrationTest { checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance); doGC = false; - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return streams1.allMetadata().size() == 1 && streams2.allMetadata().size() == 1 - && streams1.allMetadata().iterator().next().topicPartitions().size() == 1 - && streams2.allMetadata().iterator().next().topicPartitions().size() == 1; - } - }, MAX_WAIT_TIME_MS, "Should have rebalanced."); + TestUtils.waitForCondition( + () -> streams1.allMetadata().size() == 1 + && streams2.allMetadata().size() == 1 + && streams1.allMetadata().iterator().next().topicPartitions().size() == 1 + && streams2.allMetadata().iterator().next().topicPartitions().size() == 1, + MAX_WAIT_TIME_MS, + "Should have rebalanced."); writeInputData(dataAfterSecondRebalance); @@ -566,13 +538,12 @@ public class EosIntegrationTest { allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterSecondRebalance); checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery); - } finally { - streams1.close(); - streams2.close(); } } - private List> prepareData(final long fromInclusive, final long toExclusive, final Long... keys) { + private List> prepareData(final long fromInclusive, + final long toExclusive, + final Long... keys) { final List> data = new ArrayList<>(); for (final Long k : keys) { @@ -584,7 +555,9 @@ public class EosIntegrationTest { return data; } - private KafkaStreams getKafkaStreams(final boolean withState, final String appDir, final int numberOfStreamsThreads) { + private KafkaStreams getKafkaStreams(final boolean withState, + final String appDir, + final int numberOfStreamsThreads) { commitRequested = new AtomicInteger(0); errorInjected = new AtomicBoolean(false); gcInjected = new AtomicBoolean(false); @@ -619,7 +592,8 @@ public class EosIntegrationTest { } @Override - public KeyValue transform(final Long key, final Long value) { + public KeyValue transform(final Long key, + final Long value) { if (gcInjected.compareAndSet(true, false)) { while (doGC) { try { @@ -666,38 +640,34 @@ public class EosIntegrationTest { } }, storeNames) .to(SINGLE_PARTITION_OUTPUT_TOPIC); - final KafkaStreams streams = new KafkaStreams( - builder.build(), - StreamsTestUtils.getStreamsConfig( - applicationId, - CLUSTER.bootstrapServers(), - Serdes.LongSerde.class.getName(), - Serdes.LongSerde.class.getName(), - new Properties() { - { - put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads); - put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE); - put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); - put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); - put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000); - put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1); - put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS); - put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir); - put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:2142"); - } - })); - - streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - if (uncaughtException != null) { - e.printStackTrace(System.err); - fail("Should only get one uncaught exception from Streams."); - } - uncaughtException = e; + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE); + properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); + properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); + properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000); + properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1); + properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS); + properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir); + properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:2142"); + + final Properties config = StreamsTestUtils.getStreamsConfig( + applicationId, + CLUSTER.bootstrapServers(), + Serdes.LongSerde.class.getName(), + Serdes.LongSerde.class.getName(), + properties); + + final KafkaStreams streams = new KafkaStreams(builder.build(), config); + + streams.setUncaughtExceptionHandler((t, e) -> { + if (uncaughtException != null) { + e.printStackTrace(System.err); + fail("Should only get one uncaught exception from Streams."); } + uncaughtException = e; }); return streams; @@ -713,7 +683,7 @@ public class EosIntegrationTest { } private List> readResult(final int numberOfRecords, - final String groupId) throws InterruptedException { + final String groupId) throws Exception { if (groupId != null) { return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig( @@ -778,13 +748,14 @@ public class EosIntegrationTest { return expectedResult; } - private void verifyStateStore(final KafkaStreams streams, final Set> expectedStoreContent) { + private void verifyStateStore(final KafkaStreams streams, + final Set> expectedStoreContent) { ReadOnlyKeyValueStore store = null; final long maxWaitingTime = System.currentTimeMillis() + 300000L; while (System.currentTimeMillis() < maxWaitingTime) { try { - store = streams.store(storeName, QueryableStoreTypes.keyValueStore()); + store = streams.store(storeName, QueryableStoreTypes.keyValueStore()); break; } catch (final InvalidStateStoreException okJustRetry) { try { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 084519e..787cb29 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -31,6 +30,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; @@ -41,7 +41,6 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -49,7 +48,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -72,18 +70,8 @@ public class GlobalKTableEOSIntegrationTest { private static volatile int testNo = 0; private final MockTime mockTime = CLUSTER.time; - private final KeyValueMapper keyMapper = new KeyValueMapper() { - @Override - public Long apply(final String key, final Long value) { - return value; - } - }; - private final ValueJoiner joiner = new ValueJoiner() { - @Override - public String apply(final Long value1, final String value2) { - return value1 + "+" + value2; - } - }; + private final KeyValueMapper keyMapper = (key, value) -> value; + private final ValueJoiner joiner = (value1, value2) -> value1 + "+" + value2; private final String globalStore = "globalStore"; private final Map results = new HashMap<>(); private StreamsBuilder builder; @@ -96,7 +84,7 @@ public class GlobalKTableEOSIntegrationTest { private ForeachAction foreachAction; @Before - public void before() throws InterruptedException { + public void before() throws Exception { testNo++; builder = new StreamsBuilder(); createTopics(); @@ -116,16 +104,11 @@ public class GlobalKTableEOSIntegrationTest { .withValueSerde(Serdes.String())); final Consumed stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); stream = builder.stream(streamTopic, stringLongConsumed); - foreachAction = new ForeachAction() { - @Override - public void apply(final String key, final String value) { - results.put(key, value); - } - }; + foreachAction = results::put; } @After - public void whenShuttingDown() throws IOException { + public void whenShuttingDown() throws Exception { if (kafkaStreams != null) { kafkaStreams.close(); } @@ -147,24 +130,22 @@ public class GlobalKTableEOSIntegrationTest { expected.put("d", "4+D"); expected.put("e", "5+null"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return results.equals(expected); - } - }, 30000L, "waiting for initial values"); + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for initial values"); produceGlobalTableValues(); - final ReadOnlyKeyValueStore replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + final ReadOnlyKeyValueStore replicatedStore = + kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + + TestUtils.waitForCondition( + () -> "J".equals(replicatedStore.get(5L)), + 30000, + "waiting for data in replicated store"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return "J".equals(replicatedStore.get(5L)); - } - }, 30000, "waiting for data in replicated store"); produceTopicValues(streamTopic); expected.put("a", "1+F"); @@ -173,12 +154,10 @@ public class GlobalKTableEOSIntegrationTest { expected.put("d", "4+I"); expected.put("e", "5+J"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return results.equals(expected); - } - }, 30000L, "waiting for final values"); + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for final values"); } @Test @@ -195,24 +174,21 @@ public class GlobalKTableEOSIntegrationTest { expected.put("c", "3+C"); expected.put("d", "4+D"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return results.equals(expected); - } - }, 30000L, "waiting for initial values"); + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for initial values"); produceGlobalTableValues(); - final ReadOnlyKeyValueStore replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + final ReadOnlyKeyValueStore replicatedStore = + kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return "J".equals(replicatedStore.get(5L)); - } - }, 30000, "waiting for data in replicated store"); + TestUtils.waitForCondition( + () -> "J".equals(replicatedStore.get(5L)), + 30000, + "waiting for data in replicated store"); produceTopicValues(streamTopic); @@ -222,12 +198,10 @@ public class GlobalKTableEOSIntegrationTest { expected.put("d", "4+I"); expected.put("e", "5+J"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return results.equals(expected); - } - }, 30000L, "waiting for final values"); + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for final values"); } @Test @@ -242,12 +216,11 @@ public class GlobalKTableEOSIntegrationTest { expected.put(3L, "C"); expected.put(4L, "D"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - ReadOnlyKeyValueStore store = null; + TestUtils.waitForCondition( + () -> { + final ReadOnlyKeyValueStore store; try { - store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); } catch (final InvalidStateStoreException ex) { return false; } @@ -258,8 +231,9 @@ public class GlobalKTableEOSIntegrationTest { result.put(kv.key, kv.value); } return result.equals(expected); - } - }, 30000L, "waiting for initial values"); + }, + 30000L, + "waiting for initial values"); } @Test @@ -276,12 +250,11 @@ public class GlobalKTableEOSIntegrationTest { expected.put(3L, "C"); expected.put(4L, "D"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - ReadOnlyKeyValueStore store = null; + TestUtils.waitForCondition( + () -> { + final ReadOnlyKeyValueStore store; try { - store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); } catch (final InvalidStateStoreException ex) { return false; } @@ -292,11 +265,12 @@ public class GlobalKTableEOSIntegrationTest { result.put(kv.key, kv.value); } return result.equals(expected); - } - }, 30000L, "waiting for initial values"); + }, + 30000L, + "waiting for initial values"); } - private void createTopics() throws InterruptedException { + private void createTopics() throws Exception { streamTopic = "stream-" + testNo; globalTableTopic = "globalTable-" + testNo; CLUSTER.createTopics(streamTopic); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 013e2b6..2190d70 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -40,7 +40,6 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -48,7 +47,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -62,23 +60,12 @@ public class GlobalKTableIntegrationTest { private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static volatile int testNo = 0; private final MockTime mockTime = CLUSTER.time; - private final KeyValueMapper keyMapper = new KeyValueMapper() { - @Override - public Long apply(final String key, final Long value) { - return value; - } - }; - private final ValueJoiner joiner = new ValueJoiner() { - @Override - public String apply(final Long value1, final String value2) { - return value1 + "+" + value2; - } - }; + private final KeyValueMapper keyMapper = (key, value) -> value; + private final ValueJoiner joiner = (value1, value2) -> value1 + "+" + value2; private final String globalStore = "globalStore"; private final Map results = new HashMap<>(); private StreamsBuilder builder; @@ -91,15 +78,14 @@ public class GlobalKTableIntegrationTest { private ForeachAction foreachAction; @Before - public void before() throws InterruptedException { + public void before() throws Exception { testNo++; builder = new StreamsBuilder(); createTopics(); streamsConfiguration = new Properties(); final String applicationId = "globalTableTopic-table-test-" + testNo; streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration - .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); @@ -111,16 +97,11 @@ public class GlobalKTableIntegrationTest { .withValueSerde(Serdes.String())); final Consumed stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); stream = builder.stream(streamTopic, stringLongConsumed); - foreachAction = new ForeachAction() { - @Override - public void apply(final String key, final String value) { - results.put(key, value); - } - }; + foreachAction = results::put; } @After - public void whenShuttingDown() throws IOException { + public void whenShuttingDown() throws Exception { if (kafkaStreams != null) { kafkaStreams.close(); } @@ -142,24 +123,22 @@ public class GlobalKTableIntegrationTest { expected.put("d", "4+D"); expected.put("e", "5+null"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return results.equals(expected); - } - }, 30000L, "waiting for initial values"); + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for initial values"); produceGlobalTableValues(); - final ReadOnlyKeyValueStore replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + final ReadOnlyKeyValueStore replicatedStore = + kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + + TestUtils.waitForCondition( + () -> "J".equals(replicatedStore.get(5L)), + 30000, + "waiting for data in replicated store"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return "J".equals(replicatedStore.get(5L)); - } - }, 30000, "waiting for data in replicated store"); produceTopicValues(streamTopic); expected.put("a", "1+F"); @@ -168,12 +147,10 @@ public class GlobalKTableIntegrationTest { expected.put("d", "4+I"); expected.put("e", "5+J"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return results.equals(expected); - } - }, 30000L, "waiting for final values"); + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for final values"); } @Test @@ -190,24 +167,21 @@ public class GlobalKTableIntegrationTest { expected.put("c", "3+C"); expected.put("d", "4+D"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return results.equals(expected); - } - }, 30000L, "waiting for initial values"); + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for initial values"); produceGlobalTableValues(); - final ReadOnlyKeyValueStore replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + final ReadOnlyKeyValueStore replicatedStore = + kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return "J".equals(replicatedStore.get(5L)); - } - }, 30000, "waiting for data in replicated store"); + TestUtils.waitForCondition( + () -> "J".equals(replicatedStore.get(5L)), + 30000, + "waiting for data in replicated store"); produceTopicValues(streamTopic); @@ -217,12 +191,10 @@ public class GlobalKTableIntegrationTest { expected.put("d", "4+I"); expected.put("e", "5+J"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return results.equals(expected); - } - }, 30000L, "waiting for final values"); + TestUtils.waitForCondition( + () -> results.equals(expected), + 30000L, + "waiting for final values"); } @Test @@ -245,7 +217,7 @@ public class GlobalKTableIntegrationTest { assertThat(store.approximateNumEntries(), equalTo(4L)); } - private void createTopics() throws InterruptedException { + private void createTopics() throws Exception { streamTopic = "stream-" + testNo; globalTableTopic = "globalTable-" + testNo; CLUSTER.createTopics(streamTopic); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java index e0bec90..7c16927 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java @@ -17,30 +17,26 @@ package org.apache.kafka.streams.integration; -import java.time.Duration; +import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -48,14 +44,12 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; -import kafka.utils.MockTime; - import static org.junit.Assert.assertEquals; @Category({IntegrationTest.class}) @@ -71,8 +65,7 @@ public class GlobalThreadShutDownOrderTest { } @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG); private final MockTime mockTime = CLUSTER.time; private final String globalStore = "globalStore"; @@ -81,20 +74,17 @@ public class GlobalThreadShutDownOrderTest { private KafkaStreams kafkaStreams; private String globalStoreTopic; private String streamTopic; - private KStream stream; - private List retrievedValuesList = new ArrayList<>(); + private final List retrievedValuesList = new ArrayList<>(); private boolean firstRecordProcessed; @Before - public void before() throws InterruptedException { - + public void before() throws Exception { builder = new StreamsBuilder(); createTopics(); streamsConfiguration = new Properties(); final String applicationId = "global-thread-shutdown-test"; streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration - .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); @@ -103,29 +93,26 @@ public class GlobalThreadShutDownOrderTest { final Consumed stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); - final KeyValueStoreBuilder storeBuilder = new KeyValueStoreBuilder<>(Stores.persistentKeyValueStore(globalStore), - Serdes.String(), - Serdes.Long(), - mockTime); - - builder.addGlobalStore(storeBuilder, - globalStoreTopic, - Consumed.with(Serdes.String(), Serdes.Long()), - new MockProcessorSupplier()); + final KeyValueStoreBuilder storeBuilder = new KeyValueStoreBuilder<>( + Stores.persistentKeyValueStore(globalStore), + Serdes.String(), + Serdes.Long(), + mockTime); - stream = builder.stream(streamTopic, stringLongConsumed); + builder.addGlobalStore( + storeBuilder, + globalStoreTopic, + Consumed.with(Serdes.String(), Serdes.Long()), + new MockProcessorSupplier()); - stream.process(new ProcessorSupplier() { - @Override - public Processor get() { - return new GlobalStoreProcessor(globalStore); - } - }); + builder + .stream(streamTopic, stringLongConsumed) + .process(() -> new GlobalStoreProcessor(globalStore)); } @After - public void whenShuttingDown() throws IOException { + public void whenShuttingDown() throws Exception { if (kafkaStreams != null) { kafkaStreams.close(); } @@ -141,12 +128,10 @@ public class GlobalThreadShutDownOrderTest { kafkaStreams.start(); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return firstRecordProcessed; - } - }, 30000, "Has not processed record within 30 seconds"); + TestUtils.waitForCondition( + () -> firstRecordProcessed, + 30000, + "Has not processed record within 30 seconds"); kafkaStreams.close(Duration.ofSeconds(30)); @@ -155,7 +140,7 @@ public class GlobalThreadShutDownOrderTest { } - private void createTopics() throws InterruptedException { + private void createTopics() throws Exception { streamTopic = "stream-topic"; globalStoreTopic = "global-store-topic"; CLUSTER.createTopics(streamTopic); @@ -186,7 +171,6 @@ public class GlobalThreadShutDownOrderTest { private final String storeName; GlobalStoreProcessor(final String storeName) { - this.storeName = storeName; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index 1707679..32d77c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.integration; -import java.time.Duration; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; @@ -30,13 +29,11 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.WallclockTimestampExtractor; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -45,7 +42,7 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -95,19 +92,14 @@ public class KTableSourceTopicRestartIntegrationTest { @Before public void before() { final KTable kTable = streamsBuilder.table(SOURCE_TOPIC, Materialized.as("store")); - kTable.toStream().foreach(new ForeachAction() { - @Override - public void apply(final String key, final String value) { - readKeyValues.put(key, value); - } - }); + kTable.toStream().foreach(readKeyValues::put); expectedInitialResultsMap = createExpectedResultsMap("a", "b", "c"); expectedResultsWithDataWrittenDuringRestoreMap = createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h"); } @After - public void after() throws IOException { + public void after() throws Exception { IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); } @@ -129,7 +121,10 @@ public class KTableSourceTopicRestartIntegrationTest { produceKeyValues("f", "g", "h"); - assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart"); + assertNumberValuesRead( + readKeyValues, + expectedResultsWithDataWrittenDuringRestoreMap, + "Table did not get all values after restart"); } finally { streamsOne.close(Duration.ofSeconds(5)); } @@ -154,7 +149,10 @@ public class KTableSourceTopicRestartIntegrationTest { produceKeyValues("f", "g", "h"); - assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart"); + assertNumberValuesRead( + readKeyValues, + expectedResultsWithDataWrittenDuringRestoreMap, + "Table did not get all values after restart"); } finally { streamsOne.close(Duration.ofSeconds(5)); } @@ -188,12 +186,7 @@ public class KTableSourceTopicRestartIntegrationTest { final Map expectedMap, final String errorMessage) throws InterruptedException { TestUtils.waitForCondition( - new TestCondition() { - @Override - public boolean conditionMet() { - return valueMap.equals(expectedMap); - } - }, + () -> valueMap.equals(expectedMap), 30 * 1000L, errorMessage); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java index 96d7d14..4c7859b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.integration; -import java.time.Duration; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.producer.ProducerConfig; @@ -44,14 +43,13 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ExecutionException; @Category({IntegrationTest.class}) public class PurgeRepartitionTopicIntegrationTest { @@ -64,18 +62,18 @@ public class PurgeRepartitionTopicIntegrationTest { private static AdminClient adminClient; private static KafkaStreams kafkaStreams; - private static Integer purgeIntervalMs = 10; - private static Integer purgeSegmentBytes = 2000; + private static final Integer PURGE_INTERVAL_MS = 10; + private static final Integer PURGE_SEGMENT_BYTES = 2000; @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, new Properties() { { - put("log.retention.check.interval.ms", purgeIntervalMs); + put("log.retention.check.interval.ms", PURGE_INTERVAL_MS); put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 0); } }); - private Time time = CLUSTER.time; + private final Time time = CLUSTER.time; private class RepartitionTopicCreatedWithExpectedConfigs implements TestCondition { @Override @@ -92,11 +90,14 @@ public class PurgeRepartitionTopicIntegrationTest { try { final ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, REPARTITION_TOPIC); - final Config config = adminClient.describeConfigs(Collections.singleton(resource)) - .values().get(resource).get(); + final Config config = adminClient + .describeConfigs(Collections.singleton(resource)) + .values() + .get(resource) + .get(); return config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value().equals(TopicConfig.CLEANUP_POLICY_DELETE) - && config.get(TopicConfig.SEGMENT_MS_CONFIG).value().equals(purgeIntervalMs.toString()) - && config.get(TopicConfig.SEGMENT_BYTES_CONFIG).value().equals(purgeSegmentBytes.toString()); + && config.get(TopicConfig.SEGMENT_MS_CONFIG).value().equals(PURGE_INTERVAL_MS.toString()) + && config.get(TopicConfig.SEGMENT_BYTES_CONFIG).value().equals(PURGE_SEGMENT_BYTES.toString()); } catch (final Exception e) { return false; } @@ -104,7 +105,6 @@ public class PurgeRepartitionTopicIntegrationTest { } private interface TopicSizeVerifier { - boolean verify(long currentSize); } @@ -117,18 +117,19 @@ public class PurgeRepartitionTopicIntegrationTest { @Override public final boolean conditionMet() { - time.sleep(purgeIntervalMs); + time.sleep(PURGE_INTERVAL_MS); try { - final Collection logDirInfo = adminClient.describeLogDirs(Collections.singleton(0)).values().get(0).get().values(); + final Collection logDirInfo = + adminClient.describeLogDirs(Collections.singleton(0)).values().get(0).get().values(); for (final DescribeLogDirsResponse.LogDirInfo partitionInfo : logDirInfo) { - final DescribeLogDirsResponse.ReplicaInfo replicaInfo = partitionInfo.replicaInfos.get(new TopicPartition(REPARTITION_TOPIC, 0)); + final DescribeLogDirsResponse.ReplicaInfo replicaInfo = + partitionInfo.replicaInfos.get(new TopicPartition(REPARTITION_TOPIC, 0)); if (replicaInfo != null && verifier.verify(replicaInfo.size)) { return true; } } - } catch (final Exception e) { // swallow } @@ -138,7 +139,7 @@ public class PurgeRepartitionTopicIntegrationTest { } @BeforeClass - public static void createTopics() throws InterruptedException { + public static void createTopics() throws Exception { CLUSTER.createTopic(INPUT_TOPIC, 1, 1); } @@ -151,18 +152,17 @@ public class PurgeRepartitionTopicIntegrationTest { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, purgeIntervalMs); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, PURGE_INTERVAL_MS); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(APPLICATION_ID).getPath()); - streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), purgeIntervalMs); - streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), purgeSegmentBytes); - streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), purgeSegmentBytes / 2); // we cannot allow batch size larger than segment size + streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), PURGE_INTERVAL_MS); + streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), PURGE_SEGMENT_BYTES); + streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), PURGE_SEGMENT_BYTES / 2); // we cannot allow batch size larger than segment size streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); final StreamsBuilder builder = new StreamsBuilder(); - builder.stream(INPUT_TOPIC) .groupBy(MockMapper.selectKeyKeyValueMapper()) .count(); @@ -171,15 +171,14 @@ public class PurgeRepartitionTopicIntegrationTest { } @After - public void shutdown() throws IOException { + public void shutdown() { if (kafkaStreams != null) { kafkaStreams.close(Duration.ofSeconds(30)); } } - @Test - public void shouldRestoreState() throws InterruptedException, ExecutionException { + public void shouldRestoreState() throws Exception { // produce some data to input topic final List> messages = new ArrayList<>(); for (int i = 0; i < 1000; i++) { @@ -198,26 +197,16 @@ public class PurgeRepartitionTopicIntegrationTest { "Repartition topic " + REPARTITION_TOPIC + " not created with the expected configs after 60000 ms."); TestUtils.waitForCondition( - new RepartitionTopicVerified(new TopicSizeVerifier() { - @Override - public boolean verify(final long currentSize) { - return currentSize > 0; - } - }), - 60000, - "Repartition topic " + REPARTITION_TOPIC + " not received data after 60000 ms." + new RepartitionTopicVerified(currentSize -> currentSize > 0), + 60000, + "Repartition topic " + REPARTITION_TOPIC + " not received data after 60000 ms." ); // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side TestUtils.waitForCondition( - new RepartitionTopicVerified(new TopicSizeVerifier() { - @Override - public boolean verify(final long currentSize) { - return currentSize <= purgeSegmentBytes; - } - }), - 60000, - "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms." + new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES), + 60000, + "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms." ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 06014a1..856a85d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -99,8 +99,7 @@ public class QueryableStateIntegrationTest { private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static final int STREAM_THREE_PARTITIONS = 4; private final MockTime mockTime = CLUSTER.time; private String streamOne = "stream-one"; @@ -292,27 +291,31 @@ public class QueryableStateIntegrationTest { final Set keys, final String storeName) throws Exception { for (final String key : keys) { - TestUtils.waitForCondition(() -> { - try { - final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); - - if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) { + TestUtils.waitForCondition( + () -> { + try { + final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); + + if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) { + return false; + } + final int index = metadata.hostInfo().port(); + final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); + final ReadOnlyKeyValueStore store = + streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore()); + + return store != null && store.get(key) != null; + } catch (final IllegalStateException e) { + // Kafka Streams instance may have closed but rebalance hasn't happened + return false; + } catch (final InvalidStateStoreException e) { + // there must have been at least one rebalance state + assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1); return false; } - final int index = metadata.hostInfo().port(); - final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); - final ReadOnlyKeyValueStore store = streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore()); - - return store != null && store.get(key) != null; - } catch (final IllegalStateException e) { - // Kafka Streams instance may have closed but rebalance hasn't happened - return false; - } catch (final InvalidStateStoreException e) { - // there must have been at least one rebalance state - assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1); - return false; - } - }, 120000, "waiting for metadata, store and value to be non null"); + }, + 120000, + "waiting for metadata, store and value to be non null"); } } @@ -324,25 +327,29 @@ public class QueryableStateIntegrationTest { final Long from, final Long to) throws Exception { for (final String key : keys) { - TestUtils.waitForCondition(() -> { - try { - final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); - if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) { + TestUtils.waitForCondition( + () -> { + try { + final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); + if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) { + return false; + } + final int index = metadata.hostInfo().port(); + final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); + final ReadOnlyWindowStore store = + streamsWithKey.store(storeName, QueryableStoreTypes.windowStore()); + return store != null && store.fetch(key, ofEpochMilli(from), ofEpochMilli(to)) != null; + } catch (final IllegalStateException e) { + // Kafka Streams instance may have closed but rebalance hasn't happened + return false; + } catch (final InvalidStateStoreException e) { + // there must have been at least one rebalance state + assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1); return false; } - final int index = metadata.hostInfo().port(); - final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); - final ReadOnlyWindowStore store = streamsWithKey.store(storeName, QueryableStoreTypes.windowStore()); - return store != null && store.fetch(key, ofEpochMilli(from), ofEpochMilli(to)) != null; - } catch (final IllegalStateException e) { - // Kafka Streams instance may have closed but rebalance hasn't happened - return false; - } catch (final InvalidStateStoreException e) { - // there must have been at least one rebalance state - assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1); - return false; - } - }, 120000, "waiting for metadata, store and value to be non null"); + }, + 120000, + "waiting for metadata, store and value to be non null"); } } @@ -359,7 +366,13 @@ public class QueryableStateIntegrationTest { final String storeName = "word-count-store"; final String windowStoreName = "windowed-word-count-store"; for (int i = 0; i < numThreads; i++) { - streamRunnables[i] = new StreamRunnable(streamThree, outputTopicThree, outputTopicConcurrentWindowed, storeName, windowStoreName, i); + streamRunnables[i] = new StreamRunnable( + streamThree, + outputTopicThree, + outputTopicConcurrentWindowed, + storeName, + windowStoreName, + i); streamThreads[i] = new Thread(streamRunnables[i]); streamThreads[i].start(); } @@ -368,10 +381,20 @@ public class QueryableStateIntegrationTest { waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1); for (int i = 0; i < numThreads; i++) { - verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys, + verifyAllKVKeys( + streamRunnables, + streamRunnables[i].getStream(), + streamRunnables[i].getStateListener(), + inputValuesKeys, storeName + "-" + streamThree); - verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys, - windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE); + verifyAllWindowedKeys( + streamRunnables, + streamRunnables[i].getStream(), + streamRunnables[i].getStateListener(), + inputValuesKeys, + windowStoreName + "-" + streamThree, + 0L, + WINDOW_SIZE); assertEquals(KafkaStreams.State.RUNNING, streamRunnables[i].getStream().state()); } @@ -383,10 +406,20 @@ public class QueryableStateIntegrationTest { } // query from the remaining thread - verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys, + verifyAllKVKeys( + streamRunnables, + streamRunnables[0].getStream(), + streamRunnables[0].getStateListener(), + inputValuesKeys, storeName + "-" + streamThree); - verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys, - windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE); + verifyAllWindowedKeys( + streamRunnables, + streamRunnables[0].getStream(), + streamRunnables[0].getStateListener(), + inputValuesKeys, + windowStoreName + "-" + streamThree, + 0L, + WINDOW_SIZE); assertEquals(KafkaStreams.State.RUNNING, streamRunnables[0].getStream().state()); } finally { for (int i = 0; i < numThreads; i++) { @@ -407,7 +440,13 @@ public class QueryableStateIntegrationTest { final ProducerRunnable producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations); final Thread producerThread = new Thread(producerRunnable); - kafkaStreams = createCountStream(streamConcurrent, outputTopicConcurrent, outputTopicConcurrentWindowed, storeName, windowStoreName, streamsConfiguration); + kafkaStreams = createCountStream( + streamConcurrent, + outputTopicConcurrent, + outputTopicConcurrentWindowed, + storeName, + windowStoreName, + streamsConfiguration); kafkaStreams.start(); producerThread.start(); @@ -416,8 +455,8 @@ public class QueryableStateIntegrationTest { waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration); waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration); - final ReadOnlyKeyValueStore - keyValueStore = kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore()); + final ReadOnlyKeyValueStore keyValueStore = + kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore()); final ReadOnlyWindowStore windowStore = kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.windowStore()); @@ -459,7 +498,8 @@ public class QueryableStateIntegrationTest { new KeyValue<>(keys[3], 5L), new KeyValue<>(keys[4], 2L)) ); - final Set> expectedBatch1 = new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L))); + final Set> expectedBatch1 = + new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L))); IntegrationTestUtils.produceKeyValuesSynchronously( streamOne, @@ -535,7 +575,10 @@ public class QueryableStateIntegrationTest { mockTime); final KTable t1 = builder.table(streamOne); - t1.mapValues((ValueMapper) Long::valueOf, Materialized.>as("queryMapValues").withValueSerde(Serdes.Long())) + t1 + .mapValues( + (ValueMapper) Long::valueOf, + Materialized.>as("queryMapValues").withValueSerde(Serdes.Long())) .toStream() .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); @@ -544,9 +587,8 @@ public class QueryableStateIntegrationTest { waitUntilAtLeastNumRecordProcessed(outputTopic, 5); - final ReadOnlyKeyValueStore - myMapStore = kafkaStreams.store("queryMapValues", - QueryableStoreTypes.keyValueStore()); + final ReadOnlyKeyValueStore myMapStore = + kafkaStreams.store("queryMapValues", QueryableStoreTypes.keyValueStore()); for (final KeyValue batchEntry : batch1) { assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key)); } @@ -566,7 +608,8 @@ public class QueryableStateIntegrationTest { new KeyValue<>(keys[3], "5"), new KeyValue<>(keys[4], "2")) ); - final Set> expectedBatch1 = new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L))); + final Set> expectedBatch1 = + new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L))); IntegrationTestUtils.produceKeyValuesSynchronously( streamOne, @@ -581,7 +624,10 @@ public class QueryableStateIntegrationTest { final Predicate filterPredicate = (key, value) -> key.contains("kafka"); final KTable t1 = builder.table(streamOne); final KTable t2 = t1.filter(filterPredicate, Materialized.as("queryFilter")); - final KTable t3 = t2.mapValues((ValueMapper) Long::valueOf, Materialized.>as("queryMapValues").withValueSerde(Serdes.Long())); + final KTable t3 = t2 + .mapValues( + (ValueMapper) Long::valueOf, + Materialized.>as("queryMapValues").withValueSerde(Serdes.Long())); t3.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); @@ -596,7 +642,8 @@ public class QueryableStateIntegrationTest { assertEquals(myMapStore.get(expectedEntry.key), expectedEntry.value); } for (final KeyValue batchEntry : batch1) { - final KeyValue batchEntryMapValue = new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value)); + final KeyValue batchEntryMapValue = + new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value)); if (!expectedBatch1.contains(batchEntryMapValue)) { assertNull(myMapStore.get(batchEntry.key)); } @@ -685,11 +732,19 @@ public class QueryableStateIntegrationTest { mockTime); final int maxWaitMs = 30000; - TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName); - final ReadOnlyKeyValueStore store = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore()); + TestUtils.waitForCondition( + new WaitForStore(storeName), + maxWaitMs, + "waiting for store " + storeName); - TestUtils.waitForCondition(() -> new Long(8).equals(store.get("hello")), maxWaitMs, "wait for count to be 8"); + final ReadOnlyKeyValueStore store = + kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore()); + + TestUtils.waitForCondition( + () -> new Long(8).equals(store.get("hello")), + maxWaitMs, + "wait for count to be 8"); // close stream kafkaStreams.close(); @@ -699,14 +754,19 @@ public class QueryableStateIntegrationTest { kafkaStreams.start(); // make sure we never get any value other than 8 for hello - TestUtils.waitForCondition(() -> { - try { - assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore()).get("hello")); - return true; - } catch (final InvalidStateStoreException ise) { - return false; - } - }, maxWaitMs, "waiting for store " + storeName); + TestUtils.waitForCondition( + () -> { + try { + assertEquals( + Long.valueOf(8L), + kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore()).get("hello")); + return true; + } catch (final InvalidStateStoreException ise) { + return false; + } + }, + maxWaitMs, + "waiting for store " + storeName); } @@ -756,7 +816,11 @@ public class QueryableStateIntegrationTest { IntegrationTestUtils.produceKeyValuesSynchronously( streamOne, - Arrays.asList(KeyValue.pair("a", "1"), KeyValue.pair("a", "2"), KeyValue.pair("b", "3"), KeyValue.pair("b", "4")), + Arrays.asList( + KeyValue.pair("a", "1"), + KeyValue.pair("a", "2"), + KeyValue.pair("b", "3"), + KeyValue.pair("b", "4")), TestUtils.producerConfig( CLUSTER.bootstrapServers(), StringSerializer.class, @@ -765,9 +829,14 @@ public class QueryableStateIntegrationTest { mockTime); final int maxWaitMs = 30000; - TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName); - final ReadOnlyKeyValueStore store = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore()); + TestUtils.waitForCondition( + new WaitForStore(storeName), + maxWaitMs, + "waiting for store " + storeName); + + final ReadOnlyKeyValueStore store = + kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore()); TestUtils.waitForCondition( () -> "12".equals(store.get("a")) && "34".equals(store.get("b")), @@ -784,21 +853,29 @@ public class QueryableStateIntegrationTest { new Properties()), mockTime); - TestUtils.waitForCondition(failed::get, 30000, "wait for thread to fail"); - - TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName); + TestUtils.waitForCondition( + failed::get, + maxWaitMs, + "wait for thread to fail"); + TestUtils.waitForCondition( + new WaitForStore(storeName), + maxWaitMs, + "waiting for store " + storeName); - final ReadOnlyKeyValueStore store2 = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore()); + final ReadOnlyKeyValueStore store2 = + kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore()); try { - TestUtils.waitForCondition(() -> - ("125".equals(store2.get("a")) - || "1225".equals(store2.get("a")) - || "12125".equals(store2.get("a"))) - && - ("34".equals(store2.get("b")) - || "344".equals(store2.get("b")) - || "3434".equals(store2.get("b"))), maxWaitMs, "wait for agg to be |||| and ||||"); + TestUtils.waitForCondition( + () -> ("125".equals(store2.get("a")) + || "1225".equals(store2.get("a")) + || "12125".equals(store2.get("a"))) + && + ("34".equals(store2.get("b")) + || "344".equals(store2.get("b")) + || "3434".equals(store2.get("b"))), + maxWaitMs, + "wait for agg to be |||| and ||||"); } catch (final Throwable t) { throw new RuntimeException("Store content is a: " + store2.get("a") + "; b: " + store2.get("b"), t); } @@ -914,7 +991,8 @@ public class QueryableStateIntegrationTest { } - private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws Exception { + private void waitUntilAtLeastNumRecordProcessed(final String topic, + final int numRecs) throws Exception { final Properties config = new Properties(); config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer"); @@ -930,7 +1008,8 @@ public class QueryableStateIntegrationTest { private Set> fetch(final ReadOnlyWindowStore store, final String key) { - final WindowStoreIterator fetch = store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis())); + final WindowStoreIterator fetch = + store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis())); if (fetch.hasNext()) { final KeyValue next = fetch.next(); return Collections.singleton(KeyValue.pair(key, next.value)); @@ -940,7 +1019,8 @@ public class QueryableStateIntegrationTest { private Map fetchMap(final ReadOnlyWindowStore store, final String key) { - final WindowStoreIterator fetch = store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis())); + final WindowStoreIterator fetch = + store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis())); if (fetch.hasNext()) { final KeyValue next = fetch.next(); return Collections.singletonMap(key, next.value); @@ -958,7 +1038,9 @@ public class QueryableStateIntegrationTest { private int currIteration = 0; boolean shutdown = false; - ProducerRunnable(final String topic, final List inputValues, final int numIterations) { + ProducerRunnable(final String topic, + final List inputValues, + final int numIterations) { this.topic = topic; this.inputValues = inputValues; this.numIterations = numIterations;