kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Do not wait for first line of console consumer output since we now have a more reliable test using JMX
Date Tue, 18 Jul 2017 04:24:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 28c83d966 -> d663005fd


MINOR: Do not wait for first line of console consumer output since we now have a more reliable
test using JMX

Waiting for the first line of output was added in KAFKA-2527 when JmxMixin was originally
added as a heuristic to
determine when the process was ready. We've since determined this is not good enough given
JmxTool's limitations
and now include a separate, more reliable check before starting JmxTool. This check is also
dangerous since a
consumer that is started before data is available in the topic, it won't output anything to
stdout and only logs
errors to a separate log file. This means we may have a long delay between starting the process
and starting JMX
monitoring.

Since we have a more reliable check for liveness via JMX now (and in cases that need it, partition
assignment
metrics via JMX), we should no longer need to wait for the first line of output.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Apurva Mehta <apurva@confluent.io>

Closes #3447 from ewencp/dont-wait-first-line-console-consumer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d663005f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d663005f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d663005f

Branch: refs/heads/trunk
Commit: d663005fddcebafba439473299dc4c6ad74c966c
Parents: 28c83d9
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Mon Jul 17 21:24:45 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Jul 17 21:24:45 2017 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py | 40 +++++++++++------------
 1 file changed, 19 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d663005f/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 6fad674..5a945e1 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -255,27 +255,25 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         self.logger.debug("Console consumer %d command: %s", idx, cmd)
 
         consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
-        first_line = next(consumer_output, None)
-
-        if first_line is not None:
-            self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
-            self.init_jmx_attributes()
-            self.start_jmx_tool(idx, node)
-
-            for line in itertools.chain([first_line], consumer_output):
-                msg = line.strip()
-                if msg == "shutdown_complete":
-                    # Note that we can only rely on shutdown_complete message if running
0.10.0 or greater
-                    if node in self.clean_shutdown_nodes:
-                        raise Exception("Unexpected shutdown event from consumer, already
shutdown. Consumer index: %d" % idx)
-                    self.clean_shutdown_nodes.add(node)
-                else:
-                    if self.message_validator is not None:
-                        msg = self.message_validator(msg)
-                    if msg is not None:
-                        self.messages_consumed[idx].append(msg)
-
-            self.read_jmx_output(idx, node)
+
+        self.init_jmx_attributes()
+        self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
+        self.start_jmx_tool(idx, node)
+
+        for line in consumer_output:
+            msg = line.strip()
+            if msg == "shutdown_complete":
+                # Note that we can only rely on shutdown_complete message if running 0.10.0
or greater
+                if node in self.clean_shutdown_nodes:
+                    raise Exception("Unexpected shutdown event from consumer, already shutdown.
Consumer index: %d" % idx)
+                self.clean_shutdown_nodes.add(node)
+            else:
+                if self.message_validator is not None:
+                    msg = self.message_validator(msg)
+                if msg is not None:
+                    self.messages_consumed[idx].append(msg)
+
+        self.read_jmx_output(idx, node)
 
     def start_node(self, node):
         BackgroundThreadService.start_node(self, node)


Mime
View raw message