kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6676: Ensure Kafka chroot exists in system tests and use chroot on one test with security parameterizations (#4729)
Date Mon, 19 Mar 2018 15:24:11 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new e520589  KAFKA-6676: Ensure Kafka chroot exists in system tests and use chroot on
one test with security parameterizations (#4729)
e520589 is described below

commit e5205897d4957ebdea5f248e5367b71c69fc12b6
Author: Ewen Cheslack-Postava <me@ewencp.org>
AuthorDate: Mon Mar 19 03:37:02 2018 -0700

    KAFKA-6676: Ensure Kafka chroot exists in system tests and use chroot on one test with
security parameterizations (#4729)
    
    Ensures Kafka chroot exists in ZK when starting KafkaService so commands that use ZK and
are executed before the first Kafka broker starts do not fail due to the missing chroot.
    
    Also uses chroot with one test that also has security parameterizations so Kafka's test
suite exercises these combinations. Previously no tests were exercising chroots.
    
    Changes were validated using sanity_checks which include the chroot-ed test as well as
some non-chroot-ed tests.
---
 .../sanity_checks/test_console_consumer.py         |  2 +-
 tests/kafkatest/services/kafka/kafka.py            | 12 +++++++++
 tests/kafkatest/services/zookeeper.py              | 30 +++++++++++++++++-----
 3 files changed, 37 insertions(+), 7 deletions(-)

diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 066d6d4..537755d 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -36,7 +36,7 @@ class ConsoleConsumerTest(Test):
 
         self.topic = "topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka",
                                   topics={self.topic: {"partitions": 1, "replication-factor":
1}})
         self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka,
topic=self.topic, new_consumer=False)
 
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 619da9f..22674b9 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -163,6 +163,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.open_port(self.interbroker_security_protocol)
 
         self.start_minikdc(add_principals)
+        self._ensure_zk_chroot()
+
         Service.start(self)
 
         self.logger.info("Waiting for brokers to register at ZK")
@@ -183,6 +185,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                 topic_cfg["topic"] = topic
                 self.create_topic(topic_cfg)
 
+    def _ensure_zk_chroot(self):
+        self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
+        if self.zk_chroot:
+            if not self.zk_chroot.startswith('/'):
+                raise Exception("Zookeeper chroot must start with '/' but found " + self.zk_chroot)
+
+            parts = self.zk_chroot.split('/')[1:]
+            for i in range(len(parts)):
+                self.zk.create('/' + '/'.join(parts[:i+1]))
+
     def set_protocol_and_port(self, node):
         listeners = []
         advertised_listeners = []
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index b181a12..5bda867 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -103,7 +103,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         idx = self.idx(node)
         self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
         node.account.kill_java_processes(self.java_class_name(), allow_fail=False)
-        node.account.kill_java_processes(self.java_query_class_name(), allow_fail=False)
+        node.account.kill_java_processes(self.java_cli_class_name(), allow_fail=False)
         wait_until(lambda: not self.alive(node), timeout_sec=5, err_msg="Timed out waiting
for zookeeper to stop.")
 
     def clean_node(self, node):
@@ -113,7 +113,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                              (self.__class__.__name__, node.account))
         node.account.kill_java_processes(self.java_class_name(),
                                          clean_shutdown=False, allow_fail=True)
-        node.account.kill_java_processes(self.java_query_class_name(),
+        node.account.kill_java_processes(self.java_cli_class_name(),
                                          clean_shutdown=False, allow_fail=False)
         node.account.ssh("rm -rf -- %s" % ZookeeperService.ROOT, allow_fail=False)
 
@@ -134,18 +134,21 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                        (self.path.script("zookeeper-security-migration.sh", node), zk_acl,
self.connect_setting())
         node.account.ssh(la_migra_cmd)
 
+    def _check_chroot(self, chroot):
+        if chroot and not chroot.startswith("/"):
+            raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
+
     def query(self, path, chroot=None):
         """
         Queries zookeeper for data associated with 'path' and returns all fields in the schema
         """
-        if chroot and not chroot.startswith("/"):
-            raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
+        self._check_chroot(chroot)
 
         chroot_path = ('' if chroot is None else chroot) + path
 
         kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
         cmd = "%s %s -server %s get %s" % \
-              (kafka_run_class, self.java_query_class_name(), self.connect_setting(), chroot_path)
+              (kafka_run_class, self.java_cli_class_name(), self.connect_setting(), chroot_path)
         self.logger.debug(cmd)
 
         node = self.nodes[0]
@@ -158,10 +161,25 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                     result = match.groups()[0]
         return result
 
+    def create(self, path, chroot=None):
+        """
+        Create an znode at the given path
+        """
+        self._check_chroot(chroot)
+
+        chroot_path = ('' if chroot is None else chroot) + path
+
+        kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
+        cmd = "%s %s -server %s create %s ''" % \
+              (kafka_run_class, self.java_cli_class_name(), self.connect_setting(), chroot_path)
+        self.logger.debug(cmd)
+        output = self.nodes[0].account.ssh_output(cmd)
+        self.logger.debug(output)
+
     def java_class_name(self):
         """ The class name of the Zookeeper quorum peers. """
         return "org.apache.zookeeper.server.quorum.QuorumPeerMain"
 
-    def java_query_class_name(self):
+    def java_cli_class_name(self):
         """ The class name of the Zookeeper tool within Kafka. """
         return "kafka.tools.ZooKeeperMainWrapper"

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message