kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6676) System tests do not handle ZK chroot properly with SCRAM
Date Mon, 19 Mar 2018 10:38:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404613#comment-16404613

ASF GitHub Bot commented on KAFKA-6676:

rajinisivaram closed pull request #4729: KAFKA-6676: Ensure Kafka chroot exists in system
tests and use chroot on one test with security parameterizations
URL: https://github.com/apache/kafka/pull/4729

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 066d6d42c14..537755d5820 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -36,7 +36,7 @@ def __init__(self, test_context):
         self.topic = "topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka",
                                   topics={self.topic: {"partitions": 1, "replication-factor":
         self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka,
topic=self.topic, new_consumer=False)
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e563ab82dab..c4d4b247557 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -163,6 +163,8 @@ def start(self, add_principals=""):
+        self._ensure_zk_chroot()
         self.logger.info("Waiting for brokers to register at ZK")
@@ -183,6 +185,16 @@ def start(self, add_principals=""):
                 topic_cfg["topic"] = topic
+    def _ensure_zk_chroot(self):
+        self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
+        if self.zk_chroot:
+            if not self.zk_chroot.startswith('/'):
+                raise Exception("Zookeeper chroot must start with '/' but found " + self.zk_chroot)
+            parts = self.zk_chroot.split('/')[1:]
+            for i in range(len(parts)):
+                self.zk.create('/' + '/'.join(parts[:i+1]))
     def set_protocol_and_port(self, node):
         listeners = []
         advertised_listeners = []
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index b181a12210a..5bda867ed7c 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -103,7 +103,7 @@ def stop_node(self, node):
         idx = self.idx(node)
         self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
         node.account.kill_java_processes(self.java_class_name(), allow_fail=False)
-        node.account.kill_java_processes(self.java_query_class_name(), allow_fail=False)
+        node.account.kill_java_processes(self.java_cli_class_name(), allow_fail=False)
         wait_until(lambda: not self.alive(node), timeout_sec=5, err_msg="Timed out waiting
for zookeeper to stop.")
     def clean_node(self, node):
@@ -113,7 +113,7 @@ def clean_node(self, node):
                              (self.__class__.__name__, node.account))
                                          clean_shutdown=False, allow_fail=True)
-        node.account.kill_java_processes(self.java_query_class_name(),
+        node.account.kill_java_processes(self.java_cli_class_name(),
                                          clean_shutdown=False, allow_fail=False)
         node.account.ssh("rm -rf -- %s" % ZookeeperService.ROOT, allow_fail=False)
@@ -134,18 +134,21 @@ def zookeeper_migration(self, node, zk_acl):
                        (self.path.script("zookeeper-security-migration.sh", node), zk_acl,
+    def _check_chroot(self, chroot):
+        if chroot and not chroot.startswith("/"):
+            raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
     def query(self, path, chroot=None):
         Queries zookeeper for data associated with 'path' and returns all fields in the schema
-        if chroot and not chroot.startswith("/"):
-            raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
+        self._check_chroot(chroot)
         chroot_path = ('' if chroot is None else chroot) + path
         kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
         cmd = "%s %s -server %s get %s" % \
-              (kafka_run_class, self.java_query_class_name(), self.connect_setting(), chroot_path)
+              (kafka_run_class, self.java_cli_class_name(), self.connect_setting(), chroot_path)
         node = self.nodes[0]
@@ -158,10 +161,25 @@ def query(self, path, chroot=None):
                     result = match.groups()[0]
         return result
+    def create(self, path, chroot=None):
+        """
+        Create an znode at the given path
+        """
+        self._check_chroot(chroot)
+        chroot_path = ('' if chroot is None else chroot) + path
+        kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
+        cmd = "%s %s -server %s create %s ''" % \
+              (kafka_run_class, self.java_cli_class_name(), self.connect_setting(), chroot_path)
+        self.logger.debug(cmd)
+        output = self.nodes[0].account.ssh_output(cmd)
+        self.logger.debug(output)
     def java_class_name(self):
         """ The class name of the Zookeeper quorum peers. """
         return "org.apache.zookeeper.server.quorum.QuorumPeerMain"
-    def java_query_class_name(self):
+    def java_cli_class_name(self):
         """ The class name of the Zookeeper tool within Kafka. """
         return "kafka.tools.ZooKeeperMainWrapper"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

> System tests do not handle ZK chroot properly with SCRAM
> --------------------------------------------------------
>                 Key: KAFKA-6676
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6676
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Ewen Cheslack-Postava
>            Priority: Major
> This is related to the issue observed inĀ KAFKA-6672. There, we are now automatically
creating parent nodes if they do not exist. However, if using a chroot within ZK and that
chroot does not yet exist, you get an error message about "Path length must be > 0" as
it tries to create all the parent paths.
> It would probably be better to be able to detect this issue and account for it, but currently
system test code will fail if you use SCRAM and a chroot because while Kafka will create the
chroot when it starts up, there are some commands related to security that may need to be
executed before that and assume the chroot will already be there.
> We're currently missing this because while the chroot option is there, nothing in Kafka's
tests are currently exercising it. So given what is apparently a common assumption in tools
that the chroot already exists (since I think the core kafka server is the only thing that
handles creating it if needed), I think the fix here would be two-fold:
>  # Make KafkaService ensure the chroot exists before running any commands that might
need it.
>  # On at least one test that exercises security support, use a zk_chroot so that functionality
is at least reasonably well exercised.
> It would be good to have this in both trunk and 1.1 branches.

This message was sent by Atlassian JIRA

View raw message