kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4733: Improve Streams Reset Tool console output
Date Wed, 08 Feb 2017 08:08:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7f6bf5d44 -> 2e662a061


KAFKA-4733: Improve Streams Reset Tool console output

Added general explanation of the tool and what it does. Also added few details to the arguments.

Author: Gwen Shapira <cshapi@gmail.com>

Reviewers: Matthias J. Sax, Michael G. Noll, Guozhang Wang

Closes #2503 from gwenshap/KAFKA-4733


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e662a06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e662a06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e662a06

Branch: refs/heads/trunk
Commit: 2e662a061608a0755274da3e2ccf87401936febf
Parents: 7f6bf5d
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Wed Feb 8 00:08:07 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Feb 8 00:08:07 2017 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/tools/StreamsResetter.java | 26 +++++++++++++++++---
 1 file changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e662a06/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 0c9f26e..a79cd39 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -138,12 +138,12 @@ public class StreamsResetter {
             .ofType(String.class)
             .defaultsTo("localhost:2181")
             .describedAs("url");
-        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of
user input topics")
+        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of
user input topics. For these topics, the tool will reset the offset to the earliest available
offset.")
             .withRequiredArg()
             .ofType(String.class)
             .withValuesSeparatedBy(',')
             .describedAs("list");
-        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated
list of intermediate user topics")
+        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated
list of intermediate user topics (topics used in the through() method). For these topics,
the tool will skip to the end.")
             .withRequiredArg()
             .ofType(String.class)
             .withValuesSeparatedBy(',')
@@ -152,7 +152,7 @@ public class StreamsResetter {
         try {
             options = optionParser.parse(args);
         } catch (final OptionException e) {
-            optionParser.printHelpOn(System.err);
+            printHelp(optionParser);
             throw e;
         }
     }
@@ -271,6 +271,26 @@ public class StreamsResetter {
             && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
     }
 
+    private void printHelp(OptionParser parser) throws IOException {
+        System.err.println("The Application Reset Tool allows you to quickly reset an application
in order to reprocess "
+                + "its data from scratch.\n"
+                + "* This tool resets offsets of input topics to the earliest available offset
and it skips to the end of "
+                + "intermediate topics (topics used in the through() method).\n"
+                + "* This tool deletes the internal topics that were created by Kafka Streams
(topics starting with "
+                + "\"<application.id>-\").\n"
+                + "You do not need to specify internal topics because the tool finds them
automatically.\n"
+                + "* This tool will not delete output topics (if you want to delete them,
you need to do it yourself "
+                + "with the bin/kafka-topics.sh command).\n"
+                + "* This tool will not clean up the local state on the stream application
instances (the persisted "
+                + "stores used to cache aggregation results).\n"
+                + "You need to call KafkaStreams#cleanUp() in your application or manually
delete them from the "
+                + "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/<application.id>
by default).\n\n"
+                + "*** Important! You will get wrong output if you don't clean up the local
stores after running the "
+                + "reset tool!\n\n"
+        );
+        parser.printHelpOn(System.err);
+    }
+
     public static void main(final String[] args) {
         Exit.exit(new StreamsResetter().run(args));
     }


Mime
View raw message