From commits-return-9661-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Fri Jun 8 19:55:25 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 73E8C180608 for ; Fri, 8 Jun 2018 19:55:23 +0200 (CEST) Received: (qmail 61835 invoked by uid 500); 8 Jun 2018 17:55:22 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 61826 invoked by uid 99); 8 Jun 2018 17:55:22 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jun 2018 17:55:22 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id A98828281D; Fri, 8 Jun 2018 17:55:21 +0000 (UTC) Date: Fri, 08 Jun 2018 17:55:21 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.0 updated: KAFKA-5697: Use nonblocking poll in Streams (#5107) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152848052091.5669.3148942639176553744@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.0 X-Git-Reftype: branch X-Git-Oldrev: 9f66fb2a3ad3615c718d13a5b28ebdc9ed30c3d1 X-Git-Newrev: 3ab6e75aaebfd68d9b043586b26c46f2c12d4c3a X-Git-Rev: 3ab6e75aaebfd68d9b043586b26c46f2c12d4c3a X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new 3ab6e75 KAFKA-5697: Use nonblocking poll in Streams (#5107) 3ab6e75 is described below commit 3ab6e75aaebfd68d9b043586b26c46f2c12d4c3a Author: John Roesler AuthorDate: Fri Jun 8 12:54:26 2018 -0500 KAFKA-5697: Use nonblocking poll in Streams (#5107) Make use of the new Consumer#poll(Duration) to avoid getting stuck in poll when the broker is unavailable. Reviewers: Matthias J. Sax , Guozhang Wang , Bill Bejeck --- .../internals/GlobalStateManagerImpl.java | 5 +- .../processor/internals/GlobalStreamThread.java | 14 +- .../processor/internals/StoreChangelogReader.java | 7 +- .../streams/processor/internals/StreamThread.java | 28 +-- .../org/apache/kafka/streams/KafkaStreamsTest.java | 23 ++- .../integration/utils/IntegrationTestUtils.java | 3 +- .../apache/kafka/streams/perf/SimpleBenchmark.java | 5 +- .../processor/internals/AbstractTaskTest.java | 3 +- .../processor/internals/StandbyTaskTest.java | 12 +- .../processor/internals/StateConsumerTest.java | 5 +- .../internals/StoreChangelogReaderTest.java | 5 +- .../processor/internals/StreamTaskTest.java | 3 +- .../StreamThreadStateStoreProviderTest.java | 3 +- .../streams/tests/BrokerCompatibilityTest.java | 3 +- .../apache/kafka/streams/tests/EosTestDriver.java | 190 ++++++++++----------- .../kafka/streams/tests/SmokeTestDriver.java | 3 +- .../kafka/streams/tools/StreamsResetterTest.java | 21 +-- .../org/apache/kafka/test/MockRestoreConsumer.java | 4 +- .../apache/kafka/streams/TopologyTestDriver.java | 2 + 19 files changed, 188 insertions(+), 151 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index e8ec5e9..4fd7a59 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -38,6 +38,7 @@ import org.slf4j.Logger; import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -60,6 +61,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob private InternalProcessorContext processorContext; private final int retries; private final long retryBackoffMs; + private final Duration pollTime; public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, @@ -76,6 +78,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob this.stateRestoreListener = stateRestoreListener; this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG); this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG); + this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); } @Override @@ -262,7 +265,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob while (offset < highWatermark) { try { - final ConsumerRecords records = globalConsumer.poll(100); + final ConsumerRecords records = globalConsumer.poll(pollTime); final List> restoreRecords = new ArrayList<>(); for (ConsumerRecord record : records) { if (record.key() != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 112011f..9d529c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -200,7 +201,7 @@ public class GlobalStreamThread extends Thread { private final Consumer globalConsumer; private final GlobalStateMaintainer stateMaintainer; private final Time time; - private final long pollMs; + private final Duration pollTime; private final long flushInterval; private final Logger log; @@ -210,13 +211,13 @@ public class GlobalStreamThread extends Thread { final Consumer globalConsumer, final GlobalStateMaintainer stateMaintainer, final Time time, - final long pollMs, + final Duration pollTime, final long flushInterval) { this.log = logContext.logger(getClass()); this.globalConsumer = globalConsumer; this.stateMaintainer = stateMaintainer; this.time = time; - this.pollMs = pollMs; + this.pollTime = pollTime; this.flushInterval = flushInterval; } @@ -235,7 +236,7 @@ public class GlobalStreamThread extends Thread { void pollAndUpdate() { try { - final ConsumerRecords received = globalConsumer.poll(pollMs); + final ConsumerRecords received = globalConsumer.poll(pollTime); for (final ConsumerRecord record : received) { stateMaintainer.update(record); } @@ -338,8 +339,9 @@ public class GlobalStreamThread extends Thread { logContext ), time, - config.getLong(StreamsConfig.POLL_MS_CONFIG), - config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); + Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), + config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + ); stateConsumer.initialize(); return stateConsumer; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index bb0ed06..07af801 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreListener; import org.slf4j.Logger; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -49,11 +50,14 @@ public class StoreChangelogReader implements ChangelogReader { private final Map stateRestorers = new HashMap<>(); private final Map needsRestoring = new HashMap<>(); private final Map needsInitializing = new HashMap<>(); + private final Duration pollTime; public StoreChangelogReader(final Consumer restoreConsumer, + final Duration pollTime, final StateRestoreListener userStateRestoreListener, final LogContext logContext) { this.restoreConsumer = restoreConsumer; + this.pollTime = pollTime; this.log = logContext.logger(getClass()); this.userStateRestoreListener = userStateRestoreListener; } @@ -76,7 +80,7 @@ public class StoreChangelogReader implements ChangelogReader { } try { - final ConsumerRecords records = restoreConsumer.poll(10); + final ConsumerRecords records = restoreConsumer.poll(pollTime); final Iterator iterator = needsRestoring.keySet().iterator(); while (iterator.hasNext()) { final TopicPartition partition = iterator.next(); @@ -295,6 +299,7 @@ public class StoreChangelogReader implements ChangelogReader { return true; } } + return false; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e72c4a5..a159e7b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -50,6 +50,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -212,7 +213,7 @@ public class StreamThread extends Thread { if (newState == State.RUNNING) { updateThreadMetadata(taskManager.activeTasks(), taskManager.standbyTasks()); } else { - updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap()); + updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap()); } } @@ -555,7 +556,7 @@ public class StreamThread extends Thread { } private final Time time; - private final long pollTimeMs; + private final Duration pollTime; private final long commitTimeMs; private final Object stateLock; private final Logger log; @@ -602,7 +603,8 @@ public class StreamThread extends Thread { log.info("Creating restore consumer client"); final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(threadClientId); final Consumer restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); - final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext); + final Duration pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); + final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, pollTime, userStateRestoreListener, logContext); Producer threadProducer = null; final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); @@ -710,10 +712,10 @@ public class StreamThread extends Thread { this.originalReset = originalReset; this.versionProbingFlag = versionProbingFlag; - this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); + this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); - updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap()); + updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap()); } /** @@ -801,14 +803,14 @@ public class StreamThread extends Thread { if (state == State.PARTITIONS_ASSIGNED) { // try to fetch some records with zero poll millis // to unblock the restoration as soon as possible - records = pollRequests(0L); + records = pollRequests(Duration.ZERO); if (taskManager.updateNewAndRestoringTasks()) { setState(State.RUNNING); } } else { // try to fetch some records if necessary - records = pollRequests(pollTimeMs); + records = pollRequests(pollTime); // if state changed after the poll call, // try to initialize the assigned tasks again @@ -843,15 +845,15 @@ public class StreamThread extends Thread { /** * Get the next batch of records by polling. * - * @param pollTimeMs poll time millis parameter for the consumer poll + * @param pollTime how long to block in Consumer#poll * @return Next batch of records or null if no records available. * @throws TaskMigratedException if the task producer got fenced (EOS only) */ - private ConsumerRecords pollRequests(final long pollTimeMs) { + private ConsumerRecords pollRequests(final Duration pollTime) { ConsumerRecords records = null; try { - records = consumer.poll(pollTimeMs); + records = consumer.poll(pollTime); } catch (final InvalidOffsetException e) { resetInvalidOffsets(e); } @@ -1078,7 +1080,11 @@ public class StreamThread extends Thread { } try { - final ConsumerRecords records = restoreConsumer.poll(0); + // poll(0): Since this is during the normal processing, not during restoration. + // We can afford to have slower restore (because we don't wait inside poll for results). + // Instead, we want to proceed to the next iteration to call the main consumer#poll() + // as soon as possible so as to not be kicked out of the group. + final ConsumerRecords records = restoreConsumer.poll(Duration.ZERO); if (!records.isEmpty()) { for (final TopicPartition partition : records.partitions()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 297b243..8635b94 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -25,6 +27,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; @@ -42,7 +45,6 @@ import org.apache.kafka.test.TestUtils; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -234,9 +236,8 @@ public class KafkaStreamsTest { assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); } - @Ignore // this test cannot pass as long as GST blocks KS.start() @Test - public void testGlobalThreadCloseWithoutConnectingToBroker() { + public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1"); @@ -244,16 +245,26 @@ public class KafkaStreamsTest { props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); + // We want to configure request.timeout.ms, but it must be larger than a + // few other configs. + props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 200); + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 201); + props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 202); + final StreamsBuilder builder = new StreamsBuilder(); // make sure we have the global state thread running too builder.globalTable("anyTopic"); final KafkaStreams streams = new KafkaStreams(builder.build(), props); - streams.start(); - streams.close(); + try { + streams.start(); + fail("expected start() to time out and throw an exception."); + } catch (final StreamsException expected) { + // This is a result of not being able to connect to the broker. + } // There's nothing to assert... We're testing that this operation actually completes. } - @Ignore // this test cannot pass until we implement KIP-266 @Test public void testLocalThreadCloseWithoutConnectingToBroker() { final Properties props = new Properties(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index fe897c7..86cb331 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -44,6 +44,7 @@ import scala.Option; import java.io.File; import java.io.IOException; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -464,7 +465,7 @@ public class IntegrationTestUtils { while (totalPollTimeMs < waitTime && continueConsuming(consumerRecords.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; - final ConsumerRecords records = consumer.poll(pollIntervalMs); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(pollIntervalMs)); for (final ConsumerRecord record : records) { consumerRecords.add(record); diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 8187467..7179293 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -54,6 +54,7 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -334,7 +335,7 @@ public class SimpleBenchmark { consumer.seekToBeginning(partitions); while (true) { - final ConsumerRecords records = consumer.poll(POLL_MS); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_MS)); if (records.isEmpty()) { if (processedRecords == numRecords) { break; @@ -372,7 +373,7 @@ public class SimpleBenchmark { consumer.seekToBeginning(partitions); while (true) { - final ConsumerRecords records = consumer.poll(POLL_MS); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_MS)); if (records.isEmpty()) { if (processedRecords == numRecords) { break; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index 347e9c4..4ed44be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -42,6 +42,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -233,7 +234,7 @@ public class AbstractTaskTest { storeTopicPartitions, ProcessorTopology.withLocalStores(new ArrayList<>(stateStoresToChangelogTopics.keySet()), storeNamesToChangelogTopics), consumer, - new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")), + new StoreChangelogReader(consumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("stream-task-test ")), false, stateDirectory, config) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 93d6a0d..05d0e3d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -50,6 +50,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -122,7 +123,12 @@ public class StandbyTaskTest { private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer(); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("standby-task-test ")); + private final StoreChangelogReader changelogReader = new StoreChangelogReader( + restoreStateConsumer, + Duration.ZERO, + stateRestoreListener, + new LogContext("standby-task-test ") + ); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); @@ -188,7 +194,7 @@ public class StandbyTaskTest { } restoreStateConsumer.seekToBeginning(partition); - task.update(partition2, restoreStateConsumer.poll(100).records(partition2)); + task.update(partition2, restoreStateConsumer.poll(Duration.ofMillis(100)).records(partition2)); StandbyContextImpl context = (StandbyContextImpl) task.context(); MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1); @@ -245,7 +251,7 @@ public class StandbyTaskTest { } // The commit offset is at 0L. Records should not be processed - List> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition)); + List> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(Duration.ofMillis(100)).records(globalTopicPartition)); assertEquals(5, remaining.size()); committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java index 725211d..140f705 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java @@ -27,6 +27,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -52,7 +53,7 @@ public class StateConsumerTest { partitionOffsets.put(topicOne, 20L); partitionOffsets.put(topicTwo, 30L); stateMaintainer = new StateMaintainerStub(partitionOffsets); - stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, FLUSH_INTERVAL); + stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, Duration.ofMillis(10L), FLUSH_INTERVAL); } @Test @@ -109,7 +110,7 @@ public class StateConsumerTest { @Test public void shouldNotFlushWhenFlushIntervalIsZero() { - stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, -1); + stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, Duration.ofMillis(10L), -1); stateConsumer.initialize(); time.sleep(100); stateConsumer.pollAndUpdate(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index aabe7ff..90abf32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -39,6 +39,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -71,7 +72,7 @@ public class StoreChangelogReaderTest { private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener(); private final TopicPartition topicPartition = new TopicPartition("topic", 0); private final LogContext logContext = new LogContext("test-reader "); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext); + private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext); @Before public void setUp() { @@ -89,7 +90,7 @@ public class StoreChangelogReaderTest { } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext); + final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); changelogReader.restore(active); assertTrue(functionCalled.get()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 3a0fc4e..5537335 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -56,6 +56,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Map; @@ -116,7 +117,7 @@ public class StreamTaskTest { private final MockProducer producer = new MockProducer<>(false, bytesSerializer, bytesSerializer); private final MockConsumer restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener(); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("stream-task-test ")) { + private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, Duration.ZERO, stateRestoreListener, new LogContext("stream-task-test ")) { @Override public Map restoredOffsets() { return Collections.singletonMap(changelogPartition, offset); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index c24122a..66ea3c4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -49,6 +49,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -177,7 +178,7 @@ public class StreamThreadStateStoreProviderTest { Collections.singletonList(new TopicPartition(topicName, taskId.partition)), topology, clientSupplier.consumer, - new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener(), new LogContext("test-stream-task ")), + new StoreChangelogReader(clientSupplier.restoreConsumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("test-stream-task ")), streamsConfig, new MockStreamsMetrics(metrics), stateDirectory, diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index e897088..3c8446c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.ValueMapper; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.Locale; import java.util.Properties; @@ -153,7 +154,7 @@ public class BrokerCompatibilityTest { consumer.subscribe(Collections.singletonList(SINK_TOPIC)); while (true) { - final ConsumerRecords records = consumer.poll(100); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (final ConsumerRecord record : records) { if (record.key().equals("key") && record.value().equals("1")) { return; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index 752cdd6..0b18864 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -16,16 +16,18 @@ */ package org.apache.kafka.streams.tests; -import kafka.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; @@ -40,17 +42,18 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Random; -import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; public class EosTestDriver extends SmokeTestUtil { @@ -59,22 +62,19 @@ public class EosTestDriver extends SmokeTestUtil { private static boolean isRunning = true; - static int numRecordsProduced = 0; + private static int numRecordsProduced = 0; - static synchronized void updateNumRecordsProduces(final int delta) { + private static synchronized void updateNumRecordsProduces(final int delta) { numRecordsProduced += delta; } static void generate(final String kafka) { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - System.out.println("Terminating"); - System.out.flush(); - isRunning = false; - } - }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("Terminating"); + System.out.flush(); + isRunning = false; + })); final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest"); @@ -93,19 +93,16 @@ public class EosTestDriver extends SmokeTestUtil { final ProducerRecord record = new ProducerRecord<>("data", key, value); - producer.send(record, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - if (exception instanceof TimeoutException) { - try { - // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time - final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); - updateNumRecordsProduces(-expired); - } catch (Exception ignore) { } - } + producer.send(record, (metadata, exception) -> { + if (exception != null) { + exception.printStackTrace(System.err); + System.err.flush(); + if (exception instanceof TimeoutException) { + try { + // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time + final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); + updateNumRecordsProduces(-expired); + } catch (final Exception ignore) { } } } }); @@ -141,10 +138,6 @@ public class EosTestDriver extends SmokeTestUtil { } public static void verify(final String kafka, final boolean withRepartitioning) { - ensureStreamsApplicationDown(kafka); - - final Map committedOffsets = getCommittedOffsets(kafka, withRepartitioning); - final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); @@ -152,6 +145,13 @@ public class EosTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + final Map committedOffsets; + try (final AdminClient adminClient = KafkaAdminClient.create(props)) { + ensureStreamsApplicationDown(adminClient); + + committedOffsets = getCommittedOffsets(adminClient, withRepartitioning); + } + final String[] allInputTopics; final String[] allOutputTopics; if (withRepartitioning) { @@ -218,54 +218,42 @@ public class EosTestDriver extends SmokeTestUtil { System.out.flush(); } - private static void ensureStreamsApplicationDown(final String kafka) { - AdminClient adminClient = null; - try { - adminClient = AdminClient.createSimplePlaintext(kafka); + private static void ensureStreamsApplicationDown(final AdminClient adminClient) { - final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) { - if (System.currentTimeMillis() > maxWaitTime) { - throw new RuntimeException("Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds."); - } - sleep(1000); - } - } finally { - if (adminClient != null) { - adminClient.close(); + final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; + ConsumerGroupDescription description; + do { + description = getConsumerGroupDescription(adminClient); + + if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) { + throw new RuntimeException( + "Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds. " + + "Group: " + description + ); } - } + sleep(1000); + } while (!description.members().isEmpty()); } - private static Map getCommittedOffsets(final String kafka, + + private static Map getCommittedOffsets(final AdminClient adminClient, final boolean withRepartitioning) { - final Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "OffsetsClient"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + final Map topicPartitionOffsetAndMetadataMap; - final Map committedOffsets = new HashMap<>(); - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final Set topics = new HashSet<>(); - topics.add("data"); - if (withRepartitioning) { - topics.add("repartition"); - } - consumer.subscribe(topics); - consumer.poll(0); + try { + final ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID); + topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } - final Set partitions = new HashSet<>(); - for (final String topic : topics) { - for (final PartitionInfo partition : consumer.partitionsFor(topic)) { - partitions.add(new TopicPartition(partition.topic(), partition.partition())); - } - } + final Map committedOffsets = new HashMap<>(); - for (final TopicPartition tp : partitions) { - final long offset = consumer.position(tp); - committedOffsets.put(tp, offset); + for (final Map.Entry entry : topicPartitionOffsetAndMetadataMap.entrySet()) { + final String topic = entry.getKey().topic(); + if (topic.equals("data") || withRepartitioning && topic.equals("repartition")) { + committedOffsets.put(entry.getKey(), entry.getValue().offset()); } } @@ -284,7 +272,7 @@ public class EosTestDriver extends SmokeTestUtil { long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; boolean allRecordsReceived = false; while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) { - final ConsumerRecords receivedRecords = consumer.poll(100); + final ConsumerRecords receivedRecords = consumer.poll(Duration.ofMillis(100)); for (final ConsumerRecord record : receivedRecords) { maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; @@ -327,19 +315,12 @@ public class EosTestDriver extends SmokeTestUtil { final TopicPartition partition = new TopicPartition(topic, record.partition()); if (verifyTopic(topic, withRepartitioning)) { - Map>> topicRecordsPerPartition - = recordPerTopicPerPartition.get(topic); + final Map>> topicRecordsPerPartition = + recordPerTopicPerPartition.computeIfAbsent(topic, k -> new HashMap<>()); - if (topicRecordsPerPartition == null) { - topicRecordsPerPartition = new HashMap<>(); - recordPerTopicPerPartition.put(topic, topicRecordsPerPartition); - } + final List> records = + topicRecordsPerPartition.computeIfAbsent(partition, k -> new ArrayList<>()); - List> records = topicRecordsPerPartition.get(partition); - if (records == null) { - records = new ArrayList<>(); - topicRecordsPerPartition.put(partition, records); - } records.add(record); } else { throw new RuntimeException("FAIL: received data from unexpected topic: " + record); @@ -397,7 +378,7 @@ public class EosTestDriver extends SmokeTestUtil { if (partitionInput.size() != partitionMin.size()) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " - + partitionRecords.getKey() + " but received " + partitionMin.size()); + + partitionRecords.getKey() + " but received " + partitionMin.size()); } final Iterator> inputRecords = partitionInput.iterator(); @@ -439,7 +420,7 @@ public class EosTestDriver extends SmokeTestUtil { if (partitionInput.size() != partitionSum.size()) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " - + partitionRecords.getKey() + " but received " + partitionSum.size()); + + partitionRecords.getKey() + " but received " + partitionSum.size()); } final Iterator> inputRecords = partitionInput.iterator(); @@ -480,7 +461,7 @@ public class EosTestDriver extends SmokeTestUtil { if (partitionInput.size() != partitionMax.size()) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " - + partitionRecords.getKey() + " but received " + partitionMax.size()); + + partitionRecords.getKey() + " but received " + partitionMax.size()); } final Iterator> inputRecords = partitionInput.iterator(); @@ -501,7 +482,7 @@ public class EosTestDriver extends SmokeTestUtil { max = Math.max(max, value); currentMinPerKey.put(key, max); - if (!receivedKey.equals(key) || receivedValue != max.intValue()) { + if (!receivedKey.equals(key) || receivedValue != max) { throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + max + "> but was <" + receivedKey + "," + receivedValue + ">"); } } @@ -521,7 +502,7 @@ public class EosTestDriver extends SmokeTestUtil { if (partitionInput.size() != partitionCnt.size()) { throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " - + partitionRecords.getKey() + " but received " + partitionCnt.size()); + + partitionRecords.getKey() + " but received " + partitionCnt.size()); } final Iterator> inputRecords = partitionInput.iterator(); @@ -539,7 +520,7 @@ public class EosTestDriver extends SmokeTestUtil { } currentSumPerKey.put(key, ++cnt); - if (!receivedKey.equals(key) || receivedValue != cnt.longValue()) { + if (!receivedKey.equals(key) || receivedValue != cnt) { throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + receivedKey + "," + receivedValue + ">"); } } @@ -574,14 +555,11 @@ public class EosTestDriver extends SmokeTestUtil { for (final TopicPartition tp : partitions) { final ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), "key", "value"); - producer.send(record, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - Exit.exit(1); - } + producer.send(record, (metadata, exception) -> { + if (exception != null) { + exception.printStackTrace(System.err); + System.err.flush(); + Exit.exit(1); } }); } @@ -591,7 +569,7 @@ public class EosTestDriver extends SmokeTestUtil { long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { - final ConsumerRecords records = consumer.poll(100); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()) { System.out.println("No data received."); for (final TopicPartition tp : partitions) { @@ -638,4 +616,18 @@ public class EosTestDriver extends SmokeTestUtil { return partitions; } + + private static ConsumerGroupDescription getConsumerGroupDescription(final AdminClient adminClient) { + final ConsumerGroupDescription description; + try { + description = adminClient.describeConsumerGroups(Collections.singleton(EosTestClient.APP_ID)) + .describedGroups() + .get(EosTestClient.APP_ID) + .get(10, TimeUnit.SECONDS); + } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { + e.printStackTrace(); + throw new RuntimeException("Unexpected Exception getting group description", e); + } + return description; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 50330a0..7533fdd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.test.TestUtils; import java.io.File; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -289,7 +290,7 @@ public class SmokeTestDriver extends SmokeTestUtil { int retry = 0; final long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - ConsumerRecords records = consumer.poll(500); + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { if (verifyMin(min, allData, false) && verifyMax(max, allData, false) diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java index ad19f32..33cf1fa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java @@ -32,6 +32,7 @@ import org.junit.Test; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.Duration; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -74,7 +75,7 @@ public class StreamsResetterTest { streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(3, records.count()); } @@ -90,7 +91,7 @@ public class StreamsResetterTest { streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -106,7 +107,7 @@ public class StreamsResetterTest { streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -122,7 +123,7 @@ public class StreamsResetterTest { streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -138,7 +139,7 @@ public class StreamsResetterTest { streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(5, records.count()); } @@ -154,7 +155,7 @@ public class StreamsResetterTest { streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -172,7 +173,7 @@ public class StreamsResetterTest { topicPartitionsAndOffset.put(topicPartition, 3L); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -190,7 +191,7 @@ public class StreamsResetterTest { topicPartitionsAndOffset.put(topicPartition, 1L); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -208,7 +209,7 @@ public class StreamsResetterTest { topicPartitionsAndOffset.put(topicPartition, 5L); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -226,7 +227,7 @@ public class StreamsResetterTest { intermediateTopicPartitions.add(topicPartition); streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java index 3070e36..00788fd 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -85,9 +86,8 @@ public class MockRestoreConsumer extends MockConsumer { super.assign(partitions); } - @Deprecated @Override - public ConsumerRecords poll(long timeout) { + public ConsumerRecords poll(final Duration timeout) { // add buffered records to MockConsumer for (ConsumerRecord record : recordBuffer) { super.addRecord(record); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 773cbb4..7f75265 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -66,6 +66,7 @@ import org.apache.kafka.streams.test.OutputVerifier; import java.io.Closeable; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -327,6 +328,7 @@ public class TopologyTestDriver implements Closeable { consumer, new StoreChangelogReader( createRestoreConsumer(processorTopology.storeToChangelogTopic()), + Duration.ZERO, stateRestoreListener, new LogContext("topology-test-driver ")), streamsConfig, -- To stop receiving notification emails like this one, please contact guozhang@apache.org.