Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 079D9200C7D for ; Tue, 16 May 2017 23:24:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 062C1160BC1; Tue, 16 May 2017 21:24:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F2C7E160BA6 for ; Tue, 16 May 2017 23:24:34 +0200 (CEST) Received: (qmail 90258 invoked by uid 500); 16 May 2017 21:24:34 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 90249 invoked by uid 99); 16 May 2017 21:24:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 May 2017 21:24:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EEE83DFD43; Tue, 16 May 2017 21:24:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: <4d44fddfa28143ae858c92f2d7ca41cd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-5166: Add option "dry run" to Streams application reset tool Date: Tue, 16 May 2017 21:24:33 +0000 (UTC) archived-at: Tue, 16 May 2017 21:24:36 -0000 Repository: kafka Updated Branches: refs/heads/trunk 73703a15c -> d1d71aa29 KAFKA-5166: Add option "dry run" to Streams application reset tool Addressed the below review comment from #PR #2998 from mjsax I am wondering if it would be better, to "embed" the dry-run into the actual code and branch on each place. Otherwise, if things get changed, we could easily introduce bugs (ie, dry run show something different than what the actual reset code does. We could introduce methods like mabyeSeekToBeginning() that either does the seek or only prints to stdout. This would ensure that the main logic is used to "feed" into dry-run and we don't have code duplication. WDYT? Author: Bharat Viswanadham Reviewers: Matthias J. Sax, Damian Guy, Eno Thereska, Guozhang Wang Closes #3005 from bharatviswa504/KAFKA-5166 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1d71aa2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1d71aa2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1d71aa2 Branch: refs/heads/trunk Commit: d1d71aa29afd4aa959864225d2fec50c71513481 Parents: 73703a1 Author: Bharat Viswanadham Authored: Tue May 16 14:24:31 2017 -0700 Committer: Guozhang Wang Committed: Tue May 16 14:24:31 2017 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/tools/StreamsResetter.java | 151 +++++++++++++++---- 1 file changed, 120 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d1d71aa2/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 83166cd..a218125 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -16,13 +16,17 @@ */ package kafka.tools; + import joptsimple.OptionException; import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; + import kafka.admin.AdminClient; import kafka.admin.TopicCommand; import kafka.utils.ZkUtils; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; @@ -32,6 +36,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Exit; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -69,10 +74,12 @@ public class StreamsResetter { private static OptionSpec applicationIdOption; private static OptionSpec inputTopicsOption; private static OptionSpec intermediateTopicsOption; + private static OptionSpecBuilder dryRunOption; private OptionSet options = null; private final Properties consumerConfig = new Properties(); private final List allTopics = new LinkedList<>(); + private boolean dryRun = false; public int run(final String[] args) { return run(args, new Properties()); @@ -88,13 +95,11 @@ public class StreamsResetter { ZkUtils zkUtils = null; try { parseArguments(args); + dryRun = options.has(dryRunOption); adminClient = AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption)); final String groupId = options.valueOf(applicationIdOption); - if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { - throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + - "Make sure to stop all running application instances before running the reset tool."); - } + zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption), 30000, @@ -104,8 +109,18 @@ public class StreamsResetter { allTopics.clear(); allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); - resetInputAndInternalAndSeekToEndIntermediateTopicOffsets(); - deleteInternalTopics(zkUtils); + + if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { + throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + + "Make sure to stop all running application instances before running the reset tool."); + } + + if (dryRun) { + System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----"); + } + maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets(); + maybeDeleteInternalTopics(zkUtils); + } catch (final Throwable e) { exitCode = EXIT_CODE_ERROR; System.err.println("ERROR: " + e.getMessage()); @@ -148,6 +163,7 @@ public class StreamsResetter { .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); + dryRunOption = optionParser.accepts("dry-run", "Option to indicate to run streams reset tool to display actions it will perform"); try { options = optionParser.parse(args); @@ -157,39 +173,43 @@ public class StreamsResetter { } } - private void resetInputAndInternalAndSeekToEndIntermediateTopicOffsets() { + private void maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets() { final List inputTopics = options.valuesOf(inputTopicsOption); final List intermediateTopics = options.valuesOf(intermediateTopicsOption); + final List internalTopics = new ArrayList<>(); + + final List notFoundInputTopics = new ArrayList<>(); + final List notFoundIntermediateTopics = new ArrayList<>(); + + String groupId = options.valueOf(applicationIdOption); + if (inputTopics.size() == 0 && intermediateTopics.size() == 0) { System.out.println("No input or intermediate topics specified. Skipping seek."); return; - } else { + } + + if (!dryRun) { if (inputTopics.size() != 0) { - System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics."); + System.out.println("Seek-to-beginning for input topics " + inputTopics + " and all internal topics."); } if (intermediateTopics.size() != 0) { System.out.println("Seek-to-end for intermediate topics " + intermediateTopics); } } - final Properties config = new Properties(); - config.putAll(consumerConfig); - config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption)); - config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(applicationIdOption)); - config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - final Set topicsToSubscribe = new HashSet<>(inputTopics.size() + intermediateTopics.size()); + for (final String topic : inputTopics) { if (!allTopics.contains(topic)) { - System.err.println("Input topic " + topic + " not found. Skipping."); + notFoundInputTopics.add(topic); } else { topicsToSubscribe.add(topic); } } for (final String topic : intermediateTopics) { if (!allTopics.contains(topic)) { - System.err.println("Intermediate topic " + topic + " not found. Skipping."); + notFoundIntermediateTopics.add(topic); } else { topicsToSubscribe.add(topic); } @@ -197,9 +217,16 @@ public class StreamsResetter { for (final String topic : allTopics) { if (isInternalTopic(topic)) { topicsToSubscribe.add(topic); + internalTopics.add(topic); } } + final Properties config = new Properties(); + config.putAll(consumerConfig); + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption)); + config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + try (final KafkaConsumer client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { client.subscribe(topicsToSubscribe); client.poll(1); @@ -219,25 +246,83 @@ public class StreamsResetter { } } - if (inputAndInternalTopicPartitions.size() > 0) { - client.seekToBeginning(inputAndInternalTopicPartitions); + maybeSeekToBeginning(client, inputAndInternalTopicPartitions, internalTopics); + + maybeSeekToEnd(client, intermediateTopicPartitions); + + if (!dryRun) { + for (final TopicPartition p : partitions) { + client.position(p); + } + client.commitSync(); } - if (intermediateTopicPartitions.size() > 0) { - client.seekToEnd(intermediateTopicPartitions); + + if (notFoundInputTopics.size() > 0) { + System.out.println("Following input topics are not found, skipping them"); + for (final String topic : notFoundInputTopics) { + System.out.println("Topic: " + topic); + } } - for (final TopicPartition p : partitions) { - client.position(p); + if (notFoundIntermediateTopics.size() > 0) { + System.out.println("Following intermediate topics are not found, skipping them"); + for (final String topic : notFoundIntermediateTopics) { + System.out.println("Topic:" + topic); + } } - client.commitSync(); + } catch (final RuntimeException e) { System.err.println("ERROR: Resetting offsets failed."); throw e; } - System.out.println("Done."); } + private void maybeSeekToEnd(final KafkaConsumer client, final Set intermediateTopicPartitions) { + + final String groupId = options.valueOf(applicationIdOption); + final List intermediateTopics = options.valuesOf(intermediateTopicsOption); + + if (intermediateTopicPartitions.size() > 0) { + if (!dryRun) { + client.seekToEnd(intermediateTopicPartitions); + } else { + System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")"); + for (final String topic : intermediateTopics) { + if (allTopics.contains(topic)) { + System.out.println("Topic: " + topic); + } + } + } + } + + } + + private void maybeSeekToBeginning(final KafkaConsumer client, + final Set inputAndInternalTopicPartitions, + final List internalTopics) { + + final List inputTopics = options.valuesOf(inputTopicsOption); + final String groupId = options.valueOf(applicationIdOption); + + if (inputAndInternalTopicPartitions.size() > 0) { + if (!dryRun) { + client.seekToBeginning(inputAndInternalTopicPartitions); + } else { + System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")"); + for (final String topic : inputTopics) { + if (allTopics.contains(topic)) { + System.out.println("Topic: " + topic); + } + } + System.out.println("Following internal topics offsets will be reset to beginning (for consumer group " + groupId + ")"); + for (final String topic : internalTopics) { + System.out.println("Topic: " + topic); + } + } + } + } + private boolean isInputTopic(final String topic) { return options.valuesOf(inputTopicsOption).contains(topic); } @@ -246,23 +331,27 @@ public class StreamsResetter { return options.valuesOf(intermediateTopicsOption).contains(topic); } - private void deleteInternalTopics(final ZkUtils zkUtils) { + private void maybeDeleteInternalTopics(final ZkUtils zkUtils) { + System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption)); for (final String topic : allTopics) { if (isInternalTopic(topic)) { - final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{ - "--zookeeper", options.valueOf(zookeeperOption), - "--delete", "--topic", topic}); try { - TopicCommand.deleteTopic(zkUtils, commandOptions); + if (!dryRun) { + final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{ + "--zookeeper", options.valueOf(zookeeperOption), + "--delete", "--topic", topic}); + TopicCommand.deleteTopic(zkUtils, commandOptions); + } else { + System.out.println("Topic: " + topic); + } } catch (final RuntimeException e) { System.err.println("ERROR: Deleting topic " + topic + " failed."); throw e; } } } - System.out.println("Done."); }