kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-5742: support ZK chroot in system tests
Date Thu, 17 Aug 2017 22:14:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 334a30bcf -> 520e651d5


KAFKA-5742: support ZK chroot in system tests

Author: Xavier Léauté <xavier@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3677 from xvrl/support-zk-chroot-in-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/520e651d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/520e651d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/520e651d

Branch: refs/heads/trunk
Commit: 520e651d532558b346d08388cb9aa0ac6e156ece
Parents: 334a30b
Author: Xavier Léauté <xavier@confluent.io>
Authored: Thu Aug 17 15:14:32 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Aug 17 15:14:32 2017 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py    |  2 +-
 tests/kafkatest/services/kafka/kafka.py         | 48 +++++++++++---------
 .../performance/consumer_performance.py         |  2 +-
 .../services/performance/end_to_end_latency.py  |  2 +-
 tests/kafkatest/services/security/kafka_acls.py |  6 +--
 .../templates/mirror_maker_consumer.properties  |  2 +-
 tests/kafkatest/services/zookeeper.py           | 18 ++++++--
 tests/kafkatest/tests/client/quota_test.py      |  2 +-
 .../tests/core/security_rolling_upgrade_test.py |  4 +-
 tests/kafkatest/tests/core/upgrade_test.py      | 16 ++-----
 .../core/zookeeper_security_upgrade_test.py     |  2 +-
 11 files changed, 56 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 91066d3..312131e 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -170,7 +170,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
     def start_cmd(self, node):
         """Return the start command appropriate for the given node."""
         args = self.args.copy()
-        args['zk_connect'] = self.kafka.zk.connect_setting()
+        args['zk_connect'] = self.kafka.zk_connect_setting()
         args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
         args['log_dir'] = ConsoleConsumer.LOG_DIR

http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index ee60bab..febfe55 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -73,7 +73,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT,
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
-                 jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=[]):
+                 jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=[],
zk_chroot=None):
         """
         :type context
         :type zk: ZookeeperService
@@ -95,6 +95,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.server_prop_overides = server_prop_overides
         self.log_level = "DEBUG"
         self.num_nodes = num_nodes
+        self.zk_chroot = zk_chroot
 
         #
         # In a heavily loaded and not very fast machine, it is
@@ -190,7 +191,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def prop_file(self, node):
         cfg = KafkaConfig(**node.config)
         cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
-        cfg[config_property.ZOOKEEPER_CONNECT] = self.zk.connect_setting()
+        cfg[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting()
 
         for prop in self.server_prop_overides:
             cfg[prop[0]] = prop[1]
@@ -222,7 +223,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))
 
         self.security_config.setup_node(node)
-        self.security_config.setup_credentials(node, self.path, self.zk.connect_setting(),
broker=True)
+        self.security_config.setup_credentials(node, self.path, self.zk_connect_setting(),
broker=True)
 
         cmd = self.start_cmd(node)
         self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account),
cmd))
@@ -233,7 +234,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         # Credentials for inter-broker communication are created before starting Kafka.
         # Client credentials are created after starting Kafka so that both loading of
         # existing credentials from ZK and dynamic update of credentials in Kafka are tested.
-        self.security_config.setup_credentials(node, self.path, self.zk.connect_setting(),
broker=False)
+        self.security_config.setup_credentials(node, self.path, self.zk_connect_setting(),
broker=False)
 
         self.start_jmx_tool(self.idx(node), node)
         if len(self.pids(node)) == 0:
@@ -287,7 +288,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         cmd = kafka_topic_script + " "
         cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s " % {
-                'zk_connect': self.zk.connect_setting(),
+                'zk_connect': self.zk_connect_setting(),
                 'topic': topic_cfg.get("topic"),
            }
         if 'replica-assignment' in topic_cfg:
@@ -316,7 +317,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         if node is None:
             node = self.nodes[0]
         cmd = "%s --zookeeper %s --topic %s --describe" % \
-              (self.path.script("kafka-topics.sh", node), self.zk.connect_setting(), topic)
+              (self.path.script("kafka-topics.sh", node), self.zk_connect_setting(), topic)
         output = ""
         for line in node.account.ssh_capture(cmd):
             output += line
@@ -326,7 +327,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         if node is None:
             node = self.nodes[0]
         cmd = "%s --zookeeper %s --list" % \
-              (self.path.script("kafka-topics.sh", node), self.zk.connect_setting())
+              (self.path.script("kafka-topics.sh", node), self.zk_connect_setting())
         for line in node.account.ssh_capture(cmd):
             if not line.startswith("SLF4J"):
                 yield line.rstrip()
@@ -336,7 +337,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             node = self.nodes[0]
         self.logger.info("Altering message format version for topic %s with format %s", topic,
msg_format_version)
         cmd = "%s --zookeeper %s --entity-name %s --entity-type topics --alter --add-config
message.format.version=%s" % \
-              (self.path.script("kafka-configs.sh", node), self.zk.connect_setting(), topic,
msg_format_version)
+              (self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), topic,
msg_format_version)
         self.logger.info("Running alter message format command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -384,7 +385,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         # create command
         cmd = "echo %s > %s && " % (json_str, json_file)
         cmd += "%s " % self.path.script("kafka-reassign-partitions.sh", node)
-        cmd += "--zookeeper %s " % self.zk.connect_setting()
+        cmd += "--zookeeper %s " % self.zk_connect_setting()
         cmd += "--reassignment-json-file %s " % json_file
         cmd += "--verify "
         cmd += "&& sleep 1 && rm -f %s" % json_file
@@ -423,7 +424,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         # create command
         cmd = "echo %s > %s && " % (json_str, json_file)
         cmd += "%s " % self.path.script( "kafka-reassign-partitions.sh", node)
-        cmd += "--zookeeper %s " % self.zk.connect_setting()
+        cmd += "--zookeeper %s " % self.zk_connect_setting()
         cmd += "--reassignment-json-file %s " % json_file
         cmd += "--execute"
         if throttle is not None:
@@ -484,7 +485,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         """
         self.logger.debug("Querying zookeeper to find in-sync replicas for topic %s and partition
%d" % (topic, partition))
         zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
-        partition_state = self.zk.query(zk_path)
+        partition_state = self.zk.query(zk_path, chroot=self.zk_chroot)
 
         if partition_state is None:
             raise Exception("Error finding partition state for topic %s and partition %d."
% (topic, partition))
@@ -501,7 +502,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         """
         self.logger.debug("Querying zookeeper to find assigned replicas for topic %s and
partition %d" % (topic, partition))
         zk_path = "/brokers/topics/%s" % (topic)
-        assignemnt = self.zk.query(zk_path)
+        assignemnt = self.zk.query(zk_path, chroot=self.zk_chroot)
 
         if assignemnt is None:
             raise Exception("Error finding partition state for topic %s and partition %d."
% (topic, partition))
@@ -519,7 +520,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         """
         self.logger.debug("Querying zookeeper to find leader replica for topic %s and partition
%d" % (topic, partition))
         zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
-        partition_state = self.zk.query(zk_path)
+        partition_state = self.zk.query(zk_path, chroot=self.zk_chroot)
 
         if partition_state is None:
             raise Exception("Error finding partition state for topic %s and partition %d."
% (topic, partition))
@@ -535,11 +536,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         """ Get the current cluster id
         """
         self.logger.debug("Querying ZooKeeper to retrieve cluster id")
-        cluster = json.loads(self.zk.query("/cluster/id"))
-        if cluster is None:
-            raise Exception("Error querying ZK for cluster id.")
+        cluster = self.zk.query("/cluster/id", chroot=self.zk_chroot)
 
-        return cluster['id']
+        try:
+            return json.loads(cluster)['id'] if cluster else None
+        except:
+            self.logger.debug("Data in /cluster/id znode could not be parsed. Data = %s"
% cluster)
+            raise
 
     def list_consumer_groups(self, node=None, new_consumer=True, command_config=None):
         """ Get list of consumer groups.
@@ -559,7 +562,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                    self.bootstrap_servers(self.security_protocol),
                    command_config)
         else:
-            cmd = "%s --zookeeper %s %s --list" % (consumer_group_script, self.zk.connect_setting(),
command_config)
+            cmd = "%s --zookeeper %s %s --list" % (consumer_group_script, self.zk_connect_setting(),
command_config)
         output = ""
         self.logger.debug(cmd)
         for line in node.account.ssh_capture(cmd):
@@ -585,7 +588,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                   (consumer_group_script, self.bootstrap_servers(self.security_protocol),
command_config, group)
         else:
             cmd = "%s --zookeeper %s %s --group %s --describe" % \
-                  (consumer_group_script, self.zk.connect_setting(), command_config, group)
+                  (consumer_group_script, self.zk_connect_setting(), command_config, group)
         output = ""
         self.logger.debug(cmd)
         for line in node.account.ssh_capture(cmd):
@@ -594,6 +597,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.logger.debug(output)
         return output
 
+    def zk_connect_setting(self):
+        return self.zk.connect_setting(self.zk_chroot)
+
     def bootstrap_servers(self, protocol='PLAINTEXT', validate=True, offline_nodes=[]):
         """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
 
@@ -611,7 +617,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         """ Get the controller node
         """
         self.logger.debug("Querying zookeeper to find controller broker")
-        controller_info = self.zk.query("/controller")
+        controller_info = self.zk.query("/controller", chroot=self.zk_chroot)
 
         if controller_info is None:
             raise Exception("Error finding controller info")
@@ -628,7 +634,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         Check whether a broker is registered in Zookeeper
         """
         self.logger.debug("Querying zookeeper to see if broker %s is registered", node)
-        broker_info = self.zk.query("/brokers/ids/%s" % self.idx(node))
+        broker_info = self.zk.query("/brokers/ids/%s" % self.idx(node), chroot=self.zk_chroot)
         self.logger.debug("Broker info: %s", broker_info)
         return broker_info is not None
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index e1cd3a0..69a8f71 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -109,7 +109,7 @@ class ConsumerPerformanceService(PerformanceService):
                 args['new-consumer'] = ""
             args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
         else:
-            args['zookeeper'] = self.kafka.zk.connect_setting()
+            args['zookeeper'] = self.kafka.zk_connect_setting()
 
         if self.fetch_size is not None:
             args['fetch-size'] = self.fetch_size

http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index bdaf550..d0385f4 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -73,7 +73,7 @@ class EndToEndLatencyService(PerformanceService):
     def start_cmd(self, node):
         args = self.args.copy()
         args.update({
-            'zk_connect': self.kafka.zk.connect_setting(),
+            'zk_connect': self.kafka.zk_connect_setting(),
             'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
             'config_file': EndToEndLatencyService.CONFIG_FILE,
             'kafka_run_class': self.path.script("kafka-run-class.sh", node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/services/security/kafka_acls.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/kafka_acls.py b/tests/kafkatest/services/security/kafka_acls.py
index 5fcb5e7..85d3c7d 100644
--- a/tests/kafkatest/services/security/kafka_acls.py
+++ b/tests/kafkatest/services/security/kafka_acls.py
@@ -20,9 +20,9 @@ class ACLs(KafkaPathResolverMixin):
     def __init__(self, context):
         self.context = context
 
-    def set_acls(self, protocol, kafka, zk, topic, group):
+    def set_acls(self, protocol, kafka, topic, group):
         node = kafka.nodes[0]
-        setting = zk.connect_setting()
+        setting = kafka.zk_connect_setting()
 
         # Set server ACLs
         kafka_principal = "User:CN=systemtest" if protocol == "SSL" else "User:kafka"
@@ -72,4 +72,4 @@ class ACLs(KafkaPathResolverMixin):
             'topic': topic,
             'group': group,
             'principal': principal
-        }
\ No newline at end of file
+        }

http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/services/templates/mirror_maker_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/mirror_maker_consumer.properties b/tests/kafkatest/services/templates/mirror_maker_consumer.properties
index 0e5b472..0da386e 100644
--- a/tests/kafkatest/services/templates/mirror_maker_consumer.properties
+++ b/tests/kafkatest/services/templates/mirror_maker_consumer.properties
@@ -17,7 +17,7 @@
 {% if new_consumer %}
 bootstrap.servers={{ source.bootstrap_servers(security_config.security_protocol) }}
 {% else %}
-zookeeper.connect={{ source.zk.connect_setting() }}
+zookeeper.connect={{ source.zk_connect_setting() }}
 zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
 {% endif %}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index 571956b..060d632 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -106,8 +106,13 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         node.account.kill_process("zookeeper", clean_shutdown=False, allow_fail=True)
         node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False)
 
-    def connect_setting(self):
-        return ','.join([node.account.hostname + ':2181' for node in self.nodes])
+
+    def connect_setting(self, chroot=None):
+        if chroot and not chroot.starts_with("/"):
+            raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
+
+        chroot = '' if chroot is None else chroot
+        return ','.join([node.account.hostname + ':2181' + chroot for node in self.nodes])
 
     #
     # This call is used to simulate a rolling upgrade to enable/disable
@@ -118,13 +123,18 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                        (self.path.script("zookeeper-security-migration.sh", node), zk_acl,
self.connect_setting())
         node.account.ssh(la_migra_cmd)
 
-    def query(self, path):
+    def query(self, path, chroot=None):
         """
         Queries zookeeper for data associated with 'path' and returns all fields in the schema
         """
+        if chroot and not chroot.starts_with("/"):
+            raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
+
+        chroot_path = ('' if chroot is None else chroot) + path
+
         kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
         cmd = "%s kafka.tools.ZooKeeperMainWrapper -server %s get %s" % \
-              (kafka_run_class, self.connect_setting(), path)
+              (kafka_run_class, self.connect_setting(), chroot_path)
         self.logger.debug(cmd)
 
         node = self.nodes[0]

http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index baed837..ae86c28 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -78,7 +78,7 @@ class QuotaConfig(object):
     def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
         node = kafka.nodes[0]
         cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d"
% \
-              (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate,
consumer_byte_rate)
+              (kafka.path.script("kafka-configs.sh", node), kafka.zk_connect_setting(), producer_byte_rate,
consumer_byte_rate)
         cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
         if len(entity_args) > 2:
             cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])

http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index a962a9d..ba014ea 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -77,8 +77,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
 
     def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
         self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
-        self.acls.set_acls(client_protocol, self.kafka, self.zk, self.topic, self.group)
-        self.acls.set_acls(broker_protocol, self.kafka, self.zk, self.topic, self.group)
+        self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group)
+        self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group)
         self.bounce()
 
     def open_secured_port(self, client_protocol):

http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/tests/core/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
index 3e88755..c8cdac7 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -16,8 +16,6 @@
 from ducktape.mark import parametrize
 from ducktape.mark.resource import cluster
 
-import json
-
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.kafka import config_property
@@ -121,7 +119,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
                                            version=KafkaVersion(from_kafka_version))
 
         if from_kafka_version <= LATEST_0_10_0:
-            assert self.zk.query("/cluster/id") is None
+            assert self.kafka.cluster_id() is None
 
         # TODO - reduce the timeout
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
@@ -131,12 +129,6 @@ class TestUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version,
                                                                                         to_message_format_version))
 
-        cluster_id_json = self.zk.query("/cluster/id")
-        assert cluster_id_json is not None
-        try:
-            cluster_id = json.loads(cluster_id_json)
-        except :
-            self.logger.debug("Data in /cluster/id znode could not be parsed. Data = %s"
% cluster_id_json)
-
-        self.logger.debug("Cluster id [%s]", cluster_id)
-        assert len(cluster_id["id"]) == 22
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22

http://git-wip-us.apache.org/repos/asf/kafka/blob/520e651d/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
index e5e140b..235f2fa 100644
--- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
@@ -103,7 +103,7 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
         # set acls
         if self.is_secure:
             self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
-            self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group)
+            self.acls.set_acls(security_protocol, self.kafka, self.topic, self.group)
 
         if self.no_sasl:
             self.kafka.start()


Mime
View raw message