kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-1291 Add wrapper scripts and usage information to each command.
Date Thu, 19 Jun 2014 21:04:28 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d0c019a07 -> 6b0ae4bba


KAFKA-1291 Add wrapper scripts and usage information to each command.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b0ae4bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b0ae4bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b0ae4bb

Branch: refs/heads/trunk
Commit: 6b0ae4bba0d0f8e4c8da19de65a8f03f162bec39
Parents: d0c019a
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Wed Jun 18 09:41:20 2014 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Thu Jun 19 14:04:06 2014 -0700

----------------------------------------------------------------------
 bin/kafka-consumer-offset-checker.sh            | 17 ++++++
 bin/kafka-mirror-maker.sh                       | 17 ++++++
 bin/kafka-replica-verification.sh               | 17 ++++++
 bin/kafka-simple-consumer-perf-test.sh          | 21 --------
 bin/windows/kafka-consumer-offset-checker.bat   | 17 ++++++
 bin/windows/kafka-consumer-perf-test.bat        | 20 +++++++
 bin/windows/kafka-mirror-maker.bat              | 17 ++++++
 .../kafka-preferred-replica-election.bat        | 17 ++++++
 bin/windows/kafka-producer-perf-test.bat        | 20 +++++++
 bin/windows/kafka-reassign-partitions.bat       | 17 ++++++
 bin/windows/kafka-replay-log-producer.bat       | 17 ++++++
 bin/windows/kafka-replica-verification.bat      | 17 ++++++
 bin/windows/kafka-simple-consumer-shell.bat     | 17 ++++++
 bin/windows/zookeeper-shell.bat                 | 22 ++++++++
 .../PreferredReplicaLeaderElectionCommand.scala |  4 ++
 .../kafka/admin/ReassignPartitionsCommand.scala | 28 ++++------
 .../main/scala/kafka/admin/TopicCommand.scala   | 10 ++--
 .../scala/kafka/tools/ConsoleConsumer.scala     | 26 +++------
 .../scala/kafka/tools/ConsoleProducer.scala     | 12 ++---
 .../kafka/tools/ConsumerOffsetChecker.scala     | 10 ++--
 .../scala/kafka/tools/ConsumerPerformance.scala | 10 +---
 .../scala/kafka/tools/DumpLogSegments.scala     | 10 ++--
 .../scala/kafka/tools/ExportZkOffsets.scala     | 13 ++---
 .../main/scala/kafka/tools/GetOffsetShell.scala |  3 ++
 .../scala/kafka/tools/ImportZkOffsets.scala     | 13 ++---
 core/src/main/scala/kafka/tools/JmxTool.scala   |  5 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |  3 ++
 .../scala/kafka/tools/ProducerPerformance.scala | 11 ++--
 .../scala/kafka/tools/ReplayLogProducer.scala   | 12 ++---
 .../kafka/tools/ReplicaVerificationTool.scala   |  4 +-
 .../kafka/tools/SimpleConsumerPerformance.scala |  9 +---
 .../scala/kafka/tools/SimpleConsumerShell.scala | 13 ++---
 .../kafka/tools/StateChangeLogMerger.scala      |  5 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |  8 +--
 .../kafka/tools/VerifyConsumerRebalance.scala   | 12 ++---
 .../scala/kafka/utils/CommandLineUtils.scala    | 56 ++++++++++++--------
 .../other/kafka/TestLinearWriteSpeed.scala      |  8 +--
 37 files changed, 358 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/kafka-consumer-offset-checker.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-consumer-offset-checker.sh b/bin/kafka-consumer-offset-checker.sh
new file mode 100755
index 0000000..c275f7e
--- /dev/null
+++ b/bin/kafka-consumer-offset-checker.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/kafka-mirror-maker.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-mirror-maker.sh b/bin/kafka-mirror-maker.sh
new file mode 100755
index 0000000..56e342c
--- /dev/null
+++ b/bin/kafka-mirror-maker.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/kafka-replica-verification.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-replica-verification.sh b/bin/kafka-replica-verification.sh
new file mode 100755
index 0000000..ee6d19e
--- /dev/null
+++ b/bin/kafka-replica-verification.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplicaVerificationTool $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/kafka-simple-consumer-perf-test.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-simple-consumer-perf-test.sh b/bin/kafka-simple-consumer-perf-test.sh
deleted file mode 100755
index b1a5cfc..0000000
--- a/bin/kafka-simple-consumer-perf-test.sh
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
-    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
-fi
-
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerPerformance $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/windows/kafka-consumer-offset-checker.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-consumer-offset-checker.bat b/bin/windows/kafka-consumer-offset-checker.bat
new file mode 100644
index 0000000..b6967c4
--- /dev/null
+++ b/bin/windows/kafka-consumer-offset-checker.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+%~dp0kafka-run-class.bat kafka.tools.ConsumerOffsetChecker %*

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/windows/kafka-consumer-perf-test.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-consumer-perf-test.bat b/bin/windows/kafka-consumer-perf-test.bat
new file mode 100644
index 0000000..afc2259
--- /dev/null
+++ b/bin/windows/kafka-consumer-perf-test.bat
@@ -0,0 +1,20 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+SetLocal
+set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
+%~dp0kafka-run-class.bat kafka.tools.ConsumerPerformance %*
+EndLocal

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/windows/kafka-mirror-maker.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-mirror-maker.bat b/bin/windows/kafka-mirror-maker.bat
new file mode 100644
index 0000000..819e7d8
--- /dev/null
+++ b/bin/windows/kafka-mirror-maker.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+%~dp0kafka-run-class.bat kafka.tools.MirrorMaker %*

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/windows/kafka-preferred-replica-election.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-preferred-replica-election.bat b/bin/windows/kafka-preferred-replica-election.bat
new file mode 100644
index 0000000..a9a5b7e
--- /dev/null
+++ b/bin/windows/kafka-preferred-replica-election.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+%~dp0kafka-run-class.bat kafka.admin.PreferredReplicaLeaderElectionCommand %*

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/windows/kafka-producer-perf-test.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-producer-perf-test.bat b/bin/windows/kafka-producer-perf-test.bat
new file mode 100644
index 0000000..a894752
--- /dev/null
+++ b/bin/windows/kafka-producer-perf-test.bat
@@ -0,0 +1,20 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+SetLocal
+set KAFKA_HEAP_OPTS=-Xmx512M
+%~dp0kafka-run-class.bat kafka.tools.ProducerPerformance %*
+EndLocal

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/windows/kafka-reassign-partitions.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-reassign-partitions.bat b/bin/windows/kafka-reassign-partitions.bat
new file mode 100644
index 0000000..0c13ee3
--- /dev/null
+++ b/bin/windows/kafka-reassign-partitions.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+%~dp0kafka-run-class.bat kafka.admin.ReassignPartitionsCommand %*

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/windows/kafka-replay-log-producer.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-replay-log-producer.bat b/bin/windows/kafka-replay-log-producer.bat
new file mode 100644
index 0000000..2aec326
--- /dev/null
+++ b/bin/windows/kafka-replay-log-producer.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+%~dp0kafka-run-class.bat kafka.tools.ReplayLogProducer %*

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/windows/kafka-replica-verification.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-replica-verification.bat b/bin/windows/kafka-replica-verification.bat
new file mode 100644
index 0000000..481db57
--- /dev/null
+++ b/bin/windows/kafka-replica-verification.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+%~dp0kafka-run-class.bat kafka.tools.ReplicaVerificationTool %*

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/windows/kafka-simple-consumer-shell.bat
----------------------------------------------------------------------
diff --git a/bin/windows/kafka-simple-consumer-shell.bat b/bin/windows/kafka-simple-consumer-shell.bat
new file mode 100644
index 0000000..4e6ea0c
--- /dev/null
+++ b/bin/windows/kafka-simple-consumer-shell.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+%~dp0kafka-run-class.bat kafka.tools.SimpleConsumerShell %*

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/bin/windows/zookeeper-shell.bat
----------------------------------------------------------------------
diff --git a/bin/windows/zookeeper-shell.bat b/bin/windows/zookeeper-shell.bat
new file mode 100644
index 0000000..e98f069
--- /dev/null
+++ b/bin/windows/zookeeper-shell.bat
@@ -0,0 +1,22 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+IF [%1] EQU [] (
+	echo USAGE: %0 zookeeper_host:port[/path] [args...]
+	EXIT /B 1
+)
+
+%~dp0kafka-run-class.bat org.apache.zookeeper.ZooKeeperMain -server %*

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 9b3c6ae..c791848 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -40,6 +40,10 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       .withRequiredArg
       .describedAs("urls")
       .ofType(classOf[String])
+      
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "This tool causes leadership for each partition to be transferred back to the 'preferred replica'," + 
+                                                " it can be used to balance leadership among the servers.")
 
     val options = parser.parse(args : _*)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 2637586..691d69a 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -31,10 +31,8 @@ object ReassignPartitionsCommand extends Logging {
 
     // should have exactly one action
     val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _)
-    if(actions != 1) {
-      opts.parser.printHelpOn(System.err)
-      Utils.croak("Command must include exactly one action: --generate, --execute or --verify")
-    }
+    if(actions != 1)
+      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --generate, --execute or --verify")
 
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
 
@@ -58,10 +56,8 @@ object ReassignPartitionsCommand extends Logging {
   }
 
   def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
-    if(!opts.options.has(opts.reassignmentJsonFileOpt)) {
-      opts.parser.printHelpOn(System.err)
-      Utils.croak("If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
-    }
+    if(!opts.options.has(opts.reassignmentJsonFileOpt))
+      CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
     val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val jsonString = Utils.readFileAsString(jsonFile)
     val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
@@ -81,10 +77,8 @@ object ReassignPartitionsCommand extends Logging {
   }
 
   def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
-    if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt))) {
-      opts.parser.printHelpOn(System.err)
-      Utils.croak("If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options")
-    }
+    if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt)))
+      CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options")
     val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
     val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
@@ -105,11 +99,8 @@ object ReassignPartitionsCommand extends Logging {
   }
 
   def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
-    if(!opts.options.has(opts.reassignmentJsonFileOpt)) {
-      opts.parser.printHelpOn(System.err)
-      Utils.croak("If --execute option is used, command must include --reassignment-json-file that was output " +
-        "during the --generate option")
-    }
+    if(!opts.options.has(opts.reassignmentJsonFileOpt))
+      CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option")
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
     val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString)
@@ -185,6 +176,9 @@ object ReassignPartitionsCommand extends Logging {
                       .withRequiredArg
                       .describedAs("brokerlist")
                       .ofType(classOf[String])
+                      
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.")
 
     val options = parser.parse(args : _*)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 6788c2e..8d5c2e7 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -35,13 +35,13 @@ object TopicCommand {
     
     val opts = new TopicCommandOptions(args)
     
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.")
+    
     // should have exactly one action
     val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
-    if(actions != 1) {
-      System.err.println("Command must include exactly one action: --list, --describe, --create, --alter or --delete")
-      opts.parser.printHelpOn(System.err)
-      System.exit(1)
-    }
+    if(actions != 1) 
+      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
 
     opts.checkArgs()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index f6bc2f1..323fc85 100644
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -84,15 +84,15 @@ object ConsoleConsumer extends Logging {
       .describedAs("metrics dictory")
       .ofType(classOf[java.lang.String])
 
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
+      
     var groupIdPassed = true
     val options: OptionSet = tryParse(parser, args)
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
     val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
-    if (topicOrFilterOpt.size != 1) {
-      error("Exactly one of whitelist/blacklist/topic is required.")
-      parser.printHelpOn(System.err)
-      System.exit(1)
-    }
+    if (topicOrFilterOpt.size != 1)
+      CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.")
     val topicArg = options.valueOf(topicOrFilterOpt.head)
     val filterSpec = if (options.has(blacklistOpt))
       new Blacklist(topicArg)
@@ -144,7 +144,7 @@ object ConsoleConsumer extends Logging {
     val config = new ConsumerConfig(consumerProps)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
-    val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
+    val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
     val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
     val connector = Consumer.create(config)
 
@@ -217,20 +217,6 @@ object ConsoleConsumer extends Logging {
   }
 }
 
-object MessageFormatter {
-  def tryParseFormatterArgs(args: Iterable[String]): Properties = {
-    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
-    if(!splits.forall(_.length == 2)) {
-      System.err.println("Invalid parser arguments: " + args.mkString(" "))
-      System.exit(1)
-    }
-    val props = new Properties
-    for(a <- splits)
-      props.put(a(0), a(1))
-    props
-  }
-}
-
 trait MessageFormatter {
   def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
   def init(props: Properties) {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index f4e07d4..da4dad4 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -212,13 +212,9 @@ object ConsoleProducer {
     val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
 
     val options = parser.parse(args : _*)
-    for(arg <- List(topicOpt, brokerListOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.")
+    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt)
 
     import scala.collection.JavaConversions._
     val useNewProducer = options.has(useNewProducerOpt)
@@ -243,7 +239,7 @@ object ConsoleProducer {
     val valueEncoderClass = options.valueOf(valueEncoderOpt)
     val readerClass = options.valueOf(messageReaderOpt)
     val socketBuffer = options.valueOf(socketBufferSizeOpt)
-    val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
+    val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
     /* new producer related configs */
     val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
     val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 19df757..d1e7c43 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -123,6 +123,9 @@ object ConsumerOffsetChecker extends Logging {
 
     parser.accepts("broker-info", "Print broker info")
     parser.accepts("help", "Print this message.")
+    
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.")
 
     val options = parser.parse(args : _*)
 
@@ -131,12 +134,7 @@ object ConsumerOffsetChecker extends Logging {
        System.exit(0)
     }
 
-    for (opt <- List(groupOpt, zkConnectOpt))
-      if (!options.has(opt)) {
-        System.err.println("Missing required argument: %s".format(opt))
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
+    CommandLineUtils.checkRequiredArgs(parser, options, groupOpt, zkConnectOpt)
 
     val zkConnect = options.valueOf(zkConnectOpt)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 4688349..093c800 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
 import java.nio.channels.ClosedByInterruptException
 import org.apache.log4j.Logger
 import kafka.message.Message
-import kafka.utils.ZkUtils
+import kafka.utils.{ZkUtils, CommandLineUtils}
 import java.util.{ Random, Properties }
 import kafka.consumer._
 import java.text.SimpleDateFormat
@@ -120,13 +120,7 @@ object ConsumerPerformance {
 
     val options = parser.parse(args: _*)
 
-    for (arg <- List(topicOpt, zkConnectOpt)) {
-      if (!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
+    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, zkConnectOpt)
 
     val props = new Properties
     props.put("group.id", options.valueOf(groupIdOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index f0ab02a..6daf87b 100644
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -41,13 +41,13 @@ object DumpLogSegments {
                                   .ofType(classOf[java.lang.Integer])
                                   .defaultsTo(5 * 1024 * 1024)
     val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration")
+    
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
 
     val options = parser.parse(args : _*)
-    if(!options.has(filesOpt)) {
-      System.err.println("Missing required argument \"" + filesOpt + "\"")
-      parser.printHelpOn(System.err)
-      System.exit(1)
-    }
+    
+    CommandLineUtils.checkRequiredArgs(parser, options, filesOpt)
 
     val print = if(options.has(printOpt)) true else false
     val verifyOnly = if(options.has(verifyOpt)) true else false

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 005231f..4d051bc 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -19,7 +19,7 @@ package kafka.tools
 
 import java.io.FileWriter
 import joptsimple._
-import kafka.utils.{Logging, ZkUtils, ZKStringSerializer,ZKGroupTopicDirs}
+import kafka.utils.{Logging, ZkUtils, ZKStringSerializer, ZKGroupTopicDirs, CommandLineUtils}
 import org.I0Itec.zkclient.ZkClient
 
 
@@ -55,6 +55,9 @@ object ExportZkOffsets extends Logging {
                             .withRequiredArg()
                             .ofType(classOf[String])
     parser.accepts("help", "Print this message.")
+    
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.")
             
     val options = parser.parse(args : _*)
     
@@ -63,13 +66,7 @@ object ExportZkOffsets extends Logging {
        System.exit(0)
     }
     
-    for (opt <- List(zkConnectOpt, outFileOpt)) {
-      if (!options.has(opt)) {
-        System.err.println("Missing required argument: %s".format(opt))
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
+    CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt)
     
     val zkConnect  = options.valueOf(zkConnectOpt)
     val groups     = options.valuesOf(groupOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index fba652e..9c6064e 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -57,6 +57,9 @@ object GetOffsetShell {
                            .describedAs("ms")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1000)
+                           
+   if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.")
 
     val options = parser.parse(args : _*)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index c8023ee..abe0972 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -20,7 +20,7 @@ package kafka.tools
 import java.io.BufferedReader
 import java.io.FileReader
 import joptsimple._
-import kafka.utils.{Logging, ZkUtils,ZKStringSerializer}
+import kafka.utils.{Logging, ZkUtils,ZKStringSerializer, CommandLineUtils}
 import org.I0Itec.zkclient.ZkClient
 
 
@@ -52,6 +52,9 @@ object ImportZkOffsets extends Logging {
                             .withRequiredArg()
                             .ofType(classOf[String])
     parser.accepts("help", "Print this message.")
+    
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.")
             
     val options = parser.parse(args : _*)
     
@@ -60,13 +63,7 @@ object ImportZkOffsets extends Logging {
        System.exit(0)
     }
     
-    for (opt <- List(inFileOpt)) {
-      if (!options.has(opt)) {
-        System.err.println("Missing required argument: %s".format(opt))
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
+    CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt)
     
     val zkConnect           = options.valueOf(zkConnectOpt)
     val partitionOffsetFile = options.valueOf(inFileOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 747a675..1d1a120 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -26,7 +26,7 @@ import joptsimple.OptionParser
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.math._
-import kafka.utils.Logging
+import kafka.utils.{CommandLineUtils, Logging}
 
 object JmxTool extends Logging {
 
@@ -63,6 +63,9 @@ object JmxTool extends Logging {
         .describedAs("service-url")
         .ofType(classOf[String])
         .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
+        
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.")
 
     val options = parser.parse(args : _*)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index e75c4f8..7638391 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -95,6 +95,9 @@ object MirrorMaker extends Logging {
             .ofType(classOf[String])
 
     val helpOpt = parser.accepts("help", "Print this message.")
+    
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
 
     val options = parser.parse(args : _*)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 95cfbc1..fc3e724 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -19,7 +19,7 @@ package kafka.tools
 
 import kafka.metrics.KafkaMetricsReporter
 import kafka.producer.{OldProducer, NewShinyProducer}
-import kafka.utils.{VerifiableProperties, Logging}
+import kafka.utils.{VerifiableProperties, Logging, CommandLineUtils}
 import kafka.message.CompressionCodec
 import kafka.serializer._
 
@@ -123,13 +123,8 @@ object ProducerPerformance extends Logging {
     val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
 
     val options = parser.parse(args: _*)
-    for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) {
-      if (!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
+    CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt)
+
     val topicsStr = options.valueOf(topicsOpt)
     val topics = topicsStr.split(",")
     val numMessages = options.valueOf(numMessagesOpt).longValue

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index eb71e49..69be31c 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -105,13 +105,9 @@ object ReplayLogProducer extends Logging {
     val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
 
     val options = parser.parse(args : _*)
-    for(arg <- List(brokerListOpt, inputTopicOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
+    
+    CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt)
+
     val zkConnect = options.valueOf(zkConnectOpt)
     val brokerList = options.valueOf(brokerListOpt)
     val numMessages = options.valueOf(numMessagesOpt).intValue
@@ -121,7 +117,7 @@ object ReplayLogProducer extends Logging {
     val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
     val isSync = options.has(syncOpt)
     import scala.collection.JavaConversions._
-    val producerProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
+    val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 91f0728..c040f49 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -92,7 +92,9 @@ object ReplicaVerificationTool extends Logging {
                          .describedAs("ms")
                          .ofType(classOf[java.lang.Long])
                          .defaultsTo(30 * 1000L)
-
+                         
+   if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.")
 
     val options = parser.parse(args : _*)
     CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 8b8c472..7602b8d 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -141,13 +141,8 @@ object SimpleConsumerPerformance {
 
     val options = parser.parse(args : _*)
 
-    for(arg <- List(topicOpt, urlOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
+    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, urlOpt)
+
     val url = new URI(options.valueOf(urlOpt))
     val fetchSize = options.valueOf(fetchSizeOpt).intValue
     val fromLatest = options.has(resetBeginningOffsetOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 747e072..36314f4 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -93,15 +93,12 @@ object SimpleConsumerShell extends Logging {
         "skip it instead of halt.")
     val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
         "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages")
+        
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.")
 
     val options = parser.parse(args : _*)
-    for(arg <- List(brokerListOpt, topicOpt, partitionIdOpt)) {
-      if(!options.has(arg)) {
-        error("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
+    CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt)
 
     val topic = options.valueOf(topicOpt)
     val partitionId = options.valueOf(partitionIdOpt).intValue()
@@ -117,7 +114,7 @@ object SimpleConsumerShell extends Logging {
     val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt)
 
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
-    val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
+    val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
 
     val fetchRequestBuilder = new FetchRequestBuilder()
                        .clientId(clientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index 97970fb..d298e7e 100644
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -22,7 +22,7 @@ import scala.util.matching.Regex
 import collection.mutable
 import java.util.Date
 import java.text.SimpleDateFormat
-import kafka.utils.Logging
+import kafka.utils.{Logging, CommandLineUtils}
 import kafka.common.Topic
 import java.io.{BufferedOutputStream, OutputStream}
 
@@ -83,6 +83,9 @@ object StateChangeLogMerger extends Logging {
                               .describedAs("end timestamp in the format " + dateFormat)
                               .ofType(classOf[String])
                               .defaultsTo("9999-12-31 23:59:59,999")
+                              
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log files from several brokers to reconnstruct a unified history of what happened.")
 
 
     val options = parser.parse(args : _*)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
index 595dc7c..1d4ea93 100644
--- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
@@ -87,15 +87,15 @@ object TestLogCleaning {
     
     val options = parser.parse(args:_*)
     
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "An integration test for log cleaning.")
+    
     if(options.has(dumpOpt)) {
       dumpLog(new File(options.valueOf(dumpOpt)))
       System.exit(0)
     }
     
-    if(!options.has(brokerOpt) || !options.has(zkConnectOpt) || !options.has(numMessagesOpt)) {
-      parser.printHelpOn(System.err)
-      System.exit(1)
-    }
+    CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt)
     
     // parse options
     val messages = options.valueOf(numMessagesOpt).longValue

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index 92c0d1f..aef8361 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -19,7 +19,7 @@ package kafka.tools
 
 import joptsimple.OptionParser
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer}
+import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CommandLineUtils}
 
 object VerifyConsumerRebalance extends Logging {
   def main(args: Array[String]) {
@@ -30,6 +30,9 @@ object VerifyConsumerRebalance extends Logging {
     val groupOpt = parser.accepts("group", "Consumer group.").
       withRequiredArg().ofType(classOf[String])
     parser.accepts("help", "Print this message.")
+    
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "Validate that all partitions have a consumer for a given consumer group.")
 
     val options = parser.parse(args : _*)
 
@@ -38,12 +41,7 @@ object VerifyConsumerRebalance extends Logging {
       System.exit(0)
     }
 
-    for (opt <- List(groupOpt))
-      if (!options.has(opt)) {
-        System.err.println("Missing required argument: %s".format(opt))
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
+    CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
 
     val zkConnect = options.valueOf(zkConnectOpt)
     val group = options.valueOf(groupOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/main/scala/kafka/utils/CommandLineUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index c1d8ba5..1ba605c 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -25,37 +25,49 @@ import scala.collection.Set
  */
 object CommandLineUtils extends Logging {
 
+  /**
+   * Check that all the listed options are present
+   */
   def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
     for(arg <- required) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
+      if(!options.has(arg))
+        printUsageAndDie(parser, "Missing required argument \"" + arg + "\"")
     }
   }
   
+  /**
+   * Check that none of the listed options are present
+   */
   def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption: OptionSpec[_], invalidOptions: Set[OptionSpec[_]]) {
     if(options.has(usedOption)) {
       for(arg <- invalidOptions) {
-        if(options.has(arg)) {
-          System.err.println("Option \"" + usedOption + "\" can't be used with option\"" + arg + "\"")
-          parser.printHelpOn(System.err)
-          System.exit(1)
-        }
+        if(options.has(arg))
+          printUsageAndDie(parser, "Option \"" + usedOption + "\" can't be used with option\"" + arg + "\"")
       }
     }
   }
+  
+  /**
+   * Print usage and exit
+   */
+  def printUsageAndDie(parser: OptionParser, message: String) {
+    System.err.println(message)
+    parser.printHelpOn(System.err)
+    System.exit(1)
+  }
 
-   def parseCommandLineArgs(args: Iterable[String]): Properties = {
-     val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
-     if(!splits.forall(_.length == 2)) {
-       System.err.println("Invalid command line properties: " + args.mkString(" "))
-       System.exit(1)
-     }
-     val props = new Properties
-     for(a <- splits)
-       props.put(a(0), a(1))
-     props
-   }
- }
\ No newline at end of file
+  /**
+   * Parse key-value pairs in the form key=value
+   */
+  def parseKeyValueArgs(args: Iterable[String]): Properties = {
+    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
+    if(!splits.forall(_.length == 2)) {
+      System.err.println("Invalid command line properties: " + args.mkString(" "))
+      System.exit(1)
+    }
+    val props = new Properties
+    for(a <- splits)
+      props.put(a(0), a(1))
+    props
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b0ae4bb/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index eeb8c88..7211c25 100644
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -83,13 +83,7 @@ object TestLinearWriteSpeed {
                           
     val options = parser.parse(args : _*)
     
-    for(arg <- List(bytesOpt, sizeOpt, filesOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"") 
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
+    CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, filesOpt)
 
     var bytesToWrite = options.valueOf(bytesOpt).longValue
     val bufferSize = options.valueOf(sizeOpt).intValue


Mime
View raw message