Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 04E31200C5B for ; Wed, 5 Apr 2017 03:41:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0348B160BA1; Wed, 5 Apr 2017 01:41:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 52137160B90 for ; Wed, 5 Apr 2017 03:41:00 +0200 (CEST) Received: (qmail 18899 invoked by uid 500); 5 Apr 2017 01:40:59 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 18890 invoked by uid 99); 5 Apr 2017 01:40:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Apr 2017 01:40:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 52791DFE34; Wed, 5 Apr 2017 01:40:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ewencp@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-4916: test streams with brokers failing Date: Wed, 5 Apr 2017 01:40:59 +0000 (UTC) archived-at: Wed, 05 Apr 2017 01:41:02 -0000 Repository: kafka Updated Branches: refs/heads/trunk 916081007 -> 49f80b236 KAFKA-4916: test streams with brokers failing Several fixes for handling broker failures: - default replication value for internal topics is now 3 in test itself (not in streams code, that will require a KIP. - streams producer waits for acks from all replicas in test itself (not in streams code, that will require a KIP. - backoff time for streams client to try again after a failure to contact controller. - fix bug related to state store locks (this helps in multi-threaded scenarios) - fix related to catching exceptions property for network errors. - system test for all the above Author: Eno Thereska Author: Eno Thereska Reviewers: Matthias J. Sax , Damian Guy , Guozhang Wang , Dan Norwood , Ismael Juma , Ewen Cheslack-Postava Closes #2719 from enothereska/KAFKA-4916-broker-bounce-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49f80b23 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49f80b23 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49f80b23 Branch: refs/heads/trunk Commit: 49f80b2360ab55274ad28405352596f9d560ec1c Parents: 9160810 Author: Eno Thereska Authored: Tue Apr 4 18:32:58 2017 -0700 Committer: Ewen Cheslack-Postava Committed: Tue Apr 4 18:32:58 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/StreamsConfig.java | 2 +- .../internals/InternalTopicManager.java | 32 ++- .../internals/StreamPartitionAssignor.java | 13 +- .../processor/internals/StreamThread.java | 53 +++-- .../processor/internals/StreamsKafkaClient.java | 20 +- .../internals/InternalTopicManagerTest.java | 15 +- .../kafka/streams/tests/SmokeTestClient.java | 29 ++- .../kafka/streams/tests/SmokeTestDriver.java | 20 +- .../kafka/test/MockInternalTopicManager.java | 5 +- .../tests/streams/streams_bounce_test.py | 4 +- .../tests/streams/streams_broker_bounce_test.py | 213 +++++++++++++++++++ 11 files changed, 354 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 94719c6..52721ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -241,7 +241,7 @@ public class StreamsConfig extends AbstractConfig { .define(REPLICATION_FACTOR_CONFIG, Type.INT, 1, - Importance.MEDIUM, + Importance.HIGH, REPLICATION_FACTOR_DOC) .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index d8575e9..7dab99d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,16 +36,18 @@ public class InternalTopicManager { public static final String RETENTION_MS = "retention.ms"; public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS); private static final int MAX_TOPIC_READY_TRY = 5; - + private final Time time; private final long windowChangeLogAdditionalRetention; private final int replicationFactor; private final StreamsKafkaClient streamsKafkaClient; - public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor, final long windowChangeLogAdditionalRetention) { + public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor, + final long windowChangeLogAdditionalRetention, final Time time) { this.streamsKafkaClient = streamsKafkaClient; this.replicationFactor = replicationFactor; this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention; + this.time = time; } /** @@ -60,11 +63,19 @@ public class InternalTopicManager { final MetadataResponse metadata = streamsKafkaClient.fetchMetadata(); final Map existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata); final Map topicsToBeCreated = validateTopicPartitions(topics, existingTopicPartitions); + if (metadata.brokers().size() < replicationFactor) { + throw new StreamsException("Found only " + metadata.brokers().size() + " brokers, " + + " but replication factor is " + replicationFactor + "." + + " Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\"" + + " or add more brokers to your cluster."); + } streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention, metadata); return; } catch (StreamsException ex) { log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i); } + // backoff + time.sleep(100L); } throw new StreamsException("Could not create internal topics."); } @@ -73,11 +84,20 @@ public class InternalTopicManager { * Get the number of partitions for the given topics */ public Map getNumPartitions(final Set topics) { - final MetadataResponse metadata = streamsKafkaClient.fetchMetadata(); - final Map existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata); - existingTopicPartitions.keySet().retainAll(topics); + for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) { + try { + final MetadataResponse metadata = streamsKafkaClient.fetchMetadata(); + final Map existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata); + existingTopicPartitions.keySet().retainAll(topics); - return existingTopicPartitions; + return existingTopicPartitions; + } catch (StreamsException ex) { + log.warn("Could not get number of partitions: " + ex.getMessage() + " Retry #" + i); + } + // backoff + time.sleep(100L); + } + throw new StreamsException("Could not get number of partitions."); } public void close() { http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 24e6709..004926f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.TaskAssignmentException; @@ -57,7 +58,7 @@ import static org.apache.kafka.streams.processor.internals.InternalTopicManager. public class StreamPartitionAssignor implements PartitionAssignor, Configurable { private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class); - + private Time time = Time.SYSTEM; private final static int UNKNOWN = -1; public final static int NOT_AVAILABLE = -2; @@ -159,6 +160,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable private CopartitionedTopicsValidator copartitionedTopicsValidator; /** + * Package-private method to set the time. Used for tests. + * @param time Time to be used. + */ + void time(final Time time) { + this.time = time; + } + + /** * We need to have the PartitionAssignor and its StreamThread to be mutually accessible * since the former needs later's cached metadata while sending subscriptions, * and the latter needs former's returned assignment when adding tasks. @@ -207,7 +216,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1, configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ? (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) - : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT); + : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time); this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(streamThread.getName()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 9791a0a..8bd6d1a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -401,17 +401,21 @@ public class StreamThread extends Thread { @SuppressWarnings("ThrowableNotThrown") private void shutdownTasksAndState() { - log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby tasks {}", logPrefix, - activeTasks.keySet(), standbyTasks.keySet()); + log.debug("{} shutdownTasksAndState: shutting down" + + "active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", + logPrefix, activeTasks.keySet(), standbyTasks.keySet(), + suspendedTasks.keySet(), suspendedStandbyTasks.keySet()); final AtomicReference firstException = new AtomicReference<>(null); // Close all processors in topology order - firstException.compareAndSet(null, closeAllTasks()); + firstException.compareAndSet(null, closeTasks(activeAndStandbytasks())); + firstException.compareAndSet(null, closeTasks(suspendedAndSuspendedStandbytasks())); // flush state firstException.compareAndSet(null, flushAllState()); // Close all task state managers. Don't need to set exception as all // state would have been flushed above - closeAllStateManagers(firstException.get() == null); + closeStateManagers(activeAndStandbytasks(), firstException.get() == null); + closeStateManagers(suspendedAndSuspendedStandbytasks(), firstException.get() == null); // only commit under clean exit if (cleanRun && firstException.get() == null) { firstException.set(commitOffsets()); @@ -430,7 +434,7 @@ public class StreamThread extends Thread { activeTasks.keySet(), standbyTasks.keySet()); final AtomicReference firstException = new AtomicReference<>(null); // Close all topology nodes - firstException.compareAndSet(null, closeAllTasksTopologies()); + firstException.compareAndSet(null, closeActiveAndStandbyTasksTopologies()); // flush state firstException.compareAndSet(null, flushAllState()); // only commit after all state has been flushed and there hasn't been an exception @@ -453,12 +457,11 @@ public class StreamThread extends Thread { void apply(final AbstractTask task); } - private RuntimeException performOnAllTasks(final AbstractTaskAction action, - final String exceptionMessage) { + private RuntimeException performOnTasks(final List tasks, + final AbstractTaskAction action, + final String exceptionMessage) { RuntimeException firstException = null; - final List allTasks = new ArrayList(activeTasks.values()); - allTasks.addAll(standbyTasks.values()); - for (final AbstractTask task : allTasks) { + for (final AbstractTask task : tasks) { try { action.apply(task); } catch (RuntimeException t) { @@ -476,8 +479,20 @@ public class StreamThread extends Thread { return firstException; } - private Throwable closeAllStateManagers(final boolean writeCheckpoint) { - return performOnAllTasks(new AbstractTaskAction() { + private List activeAndStandbytasks() { + final List tasks = new ArrayList(activeTasks.values()); + tasks.addAll(standbyTasks.values()); + return tasks; + } + + private List suspendedAndSuspendedStandbytasks() { + final List tasks = new ArrayList(suspendedTasks.values()); + tasks.addAll(suspendedStandbyTasks.values()); + return tasks; + } + + private Throwable closeStateManagers(final List tasks, final boolean writeCheckpoint) { + return performOnTasks(tasks, new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix, task.id()); @@ -488,7 +503,7 @@ public class StreamThread extends Thread { private RuntimeException commitOffsets() { // Exceptions should not prevent this call from going through all shutdown steps - return performOnAllTasks(new AbstractTaskAction() { + return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { log.info("{} Committing consumer offsets of task {}", StreamThread.this.logPrefix, task.id()); @@ -498,7 +513,7 @@ public class StreamThread extends Thread { } private RuntimeException flushAllState() { - return performOnAllTasks(new AbstractTaskAction() { + return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { log.info("{} Flushing state stores of task {}", StreamThread.this.logPrefix, task.id()); @@ -579,6 +594,7 @@ public class StreamThread extends Thread { } } + /** * Schedule the records processing by selecting which record is processed next. Commits may * happen as records are processed. @@ -1072,8 +1088,8 @@ public class StreamThread extends Thread { standbyRecords.clear(); } - private RuntimeException closeAllTasks() { - return performOnAllTasks(new AbstractTaskAction() { + private RuntimeException closeTasks(final List tasks) { + return performOnTasks(tasks, new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { log.info("{} Closing task {}", StreamThread.this.logPrefix, task.id()); @@ -1083,8 +1099,9 @@ public class StreamThread extends Thread { }, "close"); } - private RuntimeException closeAllTasksTopologies() { - return performOnAllTasks(new AbstractTaskAction() { + + private RuntimeException closeActiveAndStandbyTasksTopologies() { + return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() { @Override public void apply(final AbstractTask task) { log.info("{} Closing task's topology {}", StreamThread.this.logPrefix, task.id()); http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java index 3217e46..ca3ab1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java @@ -205,7 +205,11 @@ public class StreamsKafkaClient { break; } } - kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds()); + try { + kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds()); + } catch (final Exception e) { + throw new StreamsException("Could not poll.", e); + } } if (brokerId == null) { throw new StreamsException("Could not find any available broker."); @@ -237,11 +241,20 @@ public class StreamsKafkaClient { } private ClientResponse sendRequest(final ClientRequest clientRequest) { - kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds()); + try { + kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds()); + } catch (final Exception e) { + throw new StreamsException("Could not send request.", e); + } final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG); // Poll for the response. while (Time.SYSTEM.milliseconds() < responseTimeout) { - List responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds()); + final List responseList; + try { + responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds()); + } catch (final IllegalStateException e) { + throw new StreamsException("Could not poll.", e); + } if (!responseList.isEmpty()) { if (responseList.size() > 1) { throw new StreamsException("Sent one request but received multiple or no responses."); @@ -271,6 +284,7 @@ public class StreamsKafkaClient { Time.SYSTEM.milliseconds(), true); final ClientResponse clientResponse = sendRequest(clientRequest); + if (!clientResponse.hasResponse()) { throw new StreamsException("Empty response for client request."); } http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 84f0e8a..031c732 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.Node; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.test.MockTimestampExtractor; @@ -40,7 +42,7 @@ public class InternalTopicManagerTest { private final String topic = "test_topic"; private final String userEndPoint = "localhost:2171"; private MockStreamKafkaClient streamsKafkaClient; - + private final Time time = new MockTime(); @Before public void init() { final StreamsConfig config = new StreamsConfig(configProps()); @@ -54,19 +56,22 @@ public class InternalTopicManagerTest { @Test public void shouldReturnCorrectPartitionCounts() throws Exception { - InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT); + InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, + WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time); Assert.assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic))); } @Test public void shouldCreateRequiredTopics() throws Exception { - InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT); + InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, + WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time); internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1)); } @Test public void shouldNotCreateTopicIfExistsWithDifferentPartitions() throws Exception { - InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT); + InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, + WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time); boolean exceptionWasThrown = false; try { internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 2)); @@ -104,7 +109,7 @@ public class InternalTopicManagerTest { Node node = new Node(1, "host1", 1001); MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList(), new ArrayList()); MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata)); - MetadataResponse response = new MetadataResponse(Collections.emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, + MetadataResponse response = new MetadataResponse(Collections.singletonList(node), null, MetadataResponse.NO_CONTROLLER_ID, Collections.singletonList(topicMetadata)); return response; } http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 7691948..263474c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.tests; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Aggregator; @@ -39,6 +40,7 @@ public class SmokeTestClient extends SmokeTestUtil { private final File stateDir; private KafkaStreams streams; private Thread thread; + private boolean uncaughtException = false; public SmokeTestClient(File stateDir, String kafka) { super(); @@ -51,10 +53,19 @@ public class SmokeTestClient extends SmokeTestUtil { streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { + System.out.println("SMOKE-TEST-CLIENT-EXCEPTION"); + uncaughtException = true; e.printStackTrace(); } }); + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + close(); + } + })); + thread = new Thread() { public void run() { streams.start(); @@ -64,32 +75,38 @@ public class SmokeTestClient extends SmokeTestUtil { } public void close() { - streams.close(); + streams.close(5, TimeUnit.SECONDS); + // do not remove these printouts since they are needed for health scripts + if (!uncaughtException) { + System.out.println("SMOKE-TEST-CLIENT-CLOSED"); + } try { thread.join(); } catch (Exception ex) { + // do not remove these printouts since they are needed for health scripts + System.out.println("SMOKE-TEST-CLIENT-EXCEPTION"); // ignore } } private static KafkaStreams createKafkaStreams(File stateDir, String kafka) { - Properties props = new Properties(); + final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest"); props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100); - props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + props.put(ProducerConfig.ACKS_CONFIG, "all"); - KStreamBuilder builder = new KStreamBuilder(); + KStreamBuilder builder = new KStreamBuilder(); KStream source = builder.stream(stringSerde, intSerde, "data"); - source.to(stringSerde, intSerde, "echo"); - KStream data = source.filter(new Predicate() { @Override public boolean test(String key, Integer value) { http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index b3520fb..11e1ae8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -131,13 +131,17 @@ public class SmokeTestDriver extends SmokeTestUtil { } public static Map> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception { - Properties props = new Properties(); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - - KafkaProducer producer = new KafkaProducer<>(props); + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + // the next 4 config values make sure that all records are produced with no loss and + // no duplicates + producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + + KafkaProducer producer = new KafkaProducer<>(producerProps); int numRecordsProduced = 0; @@ -232,7 +236,7 @@ public class SmokeTestDriver extends SmokeTestUtil { } int retry = 0; final long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(3)) { + while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { ConsumerRecords records = consumer.poll(500); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { if (verifyMin(min, allData, false) http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java index 58c6c6d..d6ada4b 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java @@ -18,6 +18,7 @@ package org.apache.kafka.test; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.internals.InternalTopicConfig; import org.apache.kafka.streams.processor.internals.InternalTopicManager; @@ -29,13 +30,15 @@ import java.util.List; import java.util.Map; import java.util.Set; + + public class MockInternalTopicManager extends InternalTopicManager { public Map readyTopics = new HashMap<>(); private MockConsumer restoreConsumer; public MockInternalTopicManager(StreamsConfig streamsConfig, MockConsumer restoreConsumer) { - super(new StreamsKafkaClient(streamsConfig), 0, 0); + super(new StreamsKafkaClient(streamsConfig), 0, 0, new MockTime()); this.restoreConsumer = restoreConsumer; } http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/tests/kafkatest/tests/streams/streams_bounce_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/streams/streams_bounce_test.py b/tests/kafkatest/tests/streams/streams_bounce_test.py index 169bbc1..7ac7939 100644 --- a/tests/kafkatest/tests/streams/streams_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_bounce_test.py @@ -26,7 +26,7 @@ class StreamsBounceTest(KafkaTest): """ def __init__(self, test_context): - super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={ + super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={ 'echo' : { 'partitions': 5, 'replication-factor': 2 }, 'data' : { 'partitions': 5, 'replication-factor': 2 }, 'min' : { 'partitions': 5, 'replication-factor': 2 }, @@ -42,7 +42,7 @@ class StreamsBounceTest(KafkaTest): self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) - @cluster(num_nodes=5) + @cluster(num_nodes=6) def test_bounce(self): """ Start a smoke test client, then abort (kill -9) and restart it a few times. http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/tests/kafkatest/tests/streams/streams_broker_bounce_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py new file mode 100644 index 0000000..86c19f9 --- /dev/null +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -0,0 +1,213 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.mark.resource import cluster +from ducktape.mark import matrix +from ducktape.mark import parametrize, ignore +from kafkatest.services.kafka import KafkaService +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService +import time +import signal +from random import randint + +def broker_node(test, topic, broker_type): + """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0 + """ + if broker_type == "leader": + node = test.kafka.leader(topic, partition=0) + elif broker_type == "controller": + node = test.kafka.controller() + else: + raise Exception("Unexpected broker type %s." % (broker_type)) + + return node + +def signal_node(test, node, sig): + test.kafka.signal_node(node, sig) + +def clean_shutdown(test, topic, broker_type): + """Discover broker node of requested type and shut it down cleanly. + """ + node = broker_node(test, topic, broker_type) + signal_node(test, node, signal.SIGTERM) + +def hard_shutdown(test, topic, broker_type): + """Discover broker node of requested type and shut it down with a hard kill.""" + node = broker_node(test, topic, broker_type) + signal_node(test, node, signal.SIGKILL) + + +failures = { + "clean_shutdown": clean_shutdown, + "hard_shutdown": hard_shutdown +} + +class StreamsBrokerBounceTest(Test): + """ + Simple test of Kafka Streams with brokers failing + """ + + def __init__(self, test_context): + super(StreamsBrokerBounceTest, self).__init__(test_context) + self.replication = 3 + self.partitions = 3 + self.topics = { + 'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication, + 'configs': {"min.insync.replicas": 2}}, + 'data' : { 'partitions': self.partitions, 'replication-factor': self.replication, + 'configs': {"min.insync.replicas": 2} }, + 'min' : { 'partitions': self.partitions, 'replication-factor': self.replication, + 'configs': {"min.insync.replicas": 2} }, + 'max' : { 'partitions': self.partitions, 'replication-factor': self.replication, + 'configs': {"min.insync.replicas": 2} }, + 'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication, + 'configs': {"min.insync.replicas": 2} }, + 'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication, + 'configs': {"min.insync.replicas": 2} }, + 'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication, + 'configs': {"min.insync.replicas": 2} }, + 'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication, + 'configs': {"min.insync.replicas": 2} }, + 'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication, + 'configs': {"min.insync.replicas": 2} }, + 'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication, + 'configs': {"min.insync.replicas": 2} } + } + + def fail_broker_type(self, failure_mode, broker_type): + # Pick a random topic and bounce it's leader + topic_index = randint(0, len(self.topics.keys()) - 1) + topic = self.topics.keys()[topic_index] + failures[failure_mode](self, topic, broker_type) + + def fail_many_brokers(self, failure_mode, num_failures): + sig = signal.SIGTERM + if (failure_mode == "clean_shutdown"): + sig = signal.SIGTERM + else: + sig = signal.SIGKILL + + for num in range(0, num_failures - 1): + signal_node(self, self.kafka.nodes[num], sig) + + + def setup_system(self): + # Setup phase + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.zk.start() + + self.kafka = KafkaService(self.test_context, num_nodes=self.replication, + zk=self.zk, topics=self.topics) + self.kafka.start() + # Start test harness + self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) + self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka) + + + self.driver.start() + self.processor1.start() + + def collect_results(self, sleep_time_secs): + data = {} + # End test + self.driver.wait() + self.driver.stop() + + self.processor1.stop() + + node = self.driver.node + + # Success is declared if streams does not crash when sleep time > 0 + # It should give an exception when sleep time is 0 since we kill the brokers immediately + # and the topic manager cannot create internal topics with the desired replication factor + if (sleep_time_secs == 0): + output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-EXCEPTION %s" % self.processor1.STDOUT_FILE, allow_fail=False) + else: + output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) + + for line in output_streams: + data["Client closed"] = line + + # Currently it is hard to guarantee anything about Kafka since we don't have exactly once. + # With exactly once in place, success will be defined as ALL-RECORDS-DELIEVERD and SUCCESS + output = node.account.ssh_capture("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED|PROCESSED-LESS-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False) + for line in output: + data["Records Delivered"] = line + output = node.account.ssh_capture("grep -E 'SUCCESS|FAILURE' %s" % self.driver.STDOUT_FILE, allow_fail=False) + for line in output: + data["Logic Success/Failure"] = line + + + return data + + @cluster(num_nodes=7) + @matrix(failure_mode=["clean_shutdown", "hard_shutdown"], + broker_type=["leader", "controller"], + sleep_time_secs=[120]) + def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs): + """ + Start a smoke test client, then kill one particular broker and ensure data is still received + Record if records are delivered. + """ + self.setup_system() + + # Sleep to allow test to run for a bit + time.sleep(sleep_time_secs) + + # Fail brokers + self.fail_broker_type(failure_mode, broker_type); + + return self.collect_results(sleep_time_secs) + + @cluster(num_nodes=7) + @matrix(failure_mode=["clean_shutdown"], + broker_type=["controller"], + sleep_time_secs=[0]) + def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs): + """ + Start a smoke test client, then kill one particular broker immediately before streams stats + Streams should throw an exception since it cannot create topics with the desired + replication factor of 3 + """ + self.setup_system() + + # Sleep to allow test to run for a bit + time.sleep(sleep_time_secs) + + # Fail brokers + self.fail_broker_type(failure_mode, broker_type); + + return self.collect_results(sleep_time_secs) + + @cluster(num_nodes=7) + @matrix(failure_mode=["clean_shutdown", "hard_shutdown"], + num_failures=[2]) + def test_many_brokers_bounce(self, failure_mode, num_failures): + """ + Start a smoke test client, then kill a few brokers and ensure data is still received + Record if records are delivered + """ + self.setup_system() + + # Sleep to allow test to run for a bit + time.sleep(120) + + # Fail brokers + self.fail_many_brokers(failure_mode, num_failures); + + return self.collect_results(120)