kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2825: Add controller failover to existing replication tests
Date Thu, 03 Dec 2015 18:43:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b09663eee -> 5b5f6bbe6


KAFKA-2825: Add controller failover to existing replication tests

Author: Anna Povzner <anna@confluent.io>

Reviewers: Geoff Anderson

Closes #618 from apovzner/kafka_2825_01


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b5f6bbe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b5f6bbe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b5f6bbe

Branch: refs/heads/trunk
Commit: 5b5f6bbe68a82ce5eae946e0a1a199e9713a6ff7
Parents: b09663e
Author: Anna Povzner <anna@confluent.io>
Authored: Thu Dec 3 10:43:44 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Dec 3 10:43:44 2015 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/kafka/kafka.py   | 36 ++++++++-------
 tests/kafkatest/services/zookeeper.py     | 22 ++++++++-
 tests/kafkatest/tests/replication_test.py | 62 +++++++++++++++++---------
 3 files changed, 81 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5b5f6bbe/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 809e87f..b2dc260 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -321,21 +321,9 @@ class KafkaService(JmxMixin, Service):
     def leader(self, topic, partition=0):
         """ Get the leader replica for the given topic and partition.
         """
-        kafka_dir = KAFKA_TRUNK
-        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s
" %\
-              (kafka_dir, self.zk.connect_setting())
-        cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
-        self.logger.debug(cmd)
-
-        node = self.zk.nodes[0]
-        self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s"
% (cmd, topic))
-        partition_state = None
-        for line in node.account.ssh_capture(cmd):
-            # loop through all lines in the output, but only hold on to the first match
-            if partition_state is None:
-                match = re.match("^({.+})$", line)
-                if match is not None:
-                    partition_state = match.groups()[0]
+        self.logger.debug("Querying zookeeper to find leader replica for topic: \n%s" % (topic))
+        zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
+        partition_state = self.zk.query(zk_path)
 
         if partition_state is None:
             raise Exception("Error finding partition state for topic %s and partition %d."
% (topic, partition))
@@ -358,4 +346,20 @@ class KafkaService(JmxMixin, Service):
         if not port_mapping.open:
             raise ValueError("We are retrieving bootstrap servers for the port: %s which
is not currently open. - " % str(port_mapping))
 
-        return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node
in self.nodes])
\ No newline at end of file
+        return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node
in self.nodes])
+
+    def controller(self):
+        """ Get the controller node
+        """
+        self.logger.debug("Querying zookeeper to find controller broker")
+        controller_info = self.zk.query("/controller")
+
+        if controller_info is None:
+            raise Exception("Error finding controller info")
+
+        controller_info = json.loads(controller_info)
+        self.logger.debug(controller_info)
+
+        controller_idx = int(controller_info["brokerid"])
+        self.logger.info("Controller's ID: %d" % (controller_idx))
+        return self.get_node(controller_idx)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b5f6bbe/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index cae4268..5b64750 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -16,10 +16,11 @@
 
 from ducktape.services.service import Service
 
-from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
 
 import subprocess
 import time
+import re
 
 
 class ZookeeperService(Service):
@@ -83,3 +84,22 @@ class ZookeeperService(Service):
 
     def connect_setting(self):
         return ','.join([node.account.hostname + ':2181' for node in self.nodes])
+
+    def query(self, path):
+        """
+        Queries zookeeper for data associated with 'path' and returns all fields in the schema
+        """
+        kafka_dir = KAFKA_TRUNK
+        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s
get %s" % \
+              (kafka_dir, self.connect_setting(), path)
+        self.logger.debug(cmd)
+
+        node = self.nodes[0]
+        result = None
+        for line in node.account.ssh_capture(cmd):
+            # loop through all lines in the output, but only hold on to the first match
+            if result is None:
+                match = re.match("^({.+})$", line)
+                if match is not None:
+                    result = match.groups()[0]
+        return result
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b5f6bbe/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
index a8f2337..4909a9a 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -25,41 +25,55 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 
 import signal
 
+def broker_node(test, broker_type):
+    """ Discover node of requested type. For leader type, discovers leader for our topic
and partition 0
+    """
+    if broker_type == "leader":
+        node = test.kafka.leader(test.topic, partition=0)
+    elif broker_type == "controller":
+        node = test.kafka.controller()
+    else:
+        raise Exception("Unexpected broker type %s." % (broker_type))
+
+    return node
 
-def clean_shutdown(test):
-    """Discover leader node for our topic and shut it down cleanly."""
-    test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGTERM)
+def clean_shutdown(test, broker_type):
+    """Discover broker node of requested type and shut it down cleanly.
+    """
+    node = broker_node(test, broker_type)
+    test.kafka.signal_node(node, sig=signal.SIGTERM)
 
 
-def hard_shutdown(test):
-    """Discover leader node for our topic and shut it down with a hard kill."""
-    test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGKILL)
+def hard_shutdown(test, broker_type):
+    """Discover broker node of requested type and shut it down with a hard kill."""
+    node = broker_node(test, broker_type)
+    test.kafka.signal_node(node, sig=signal.SIGKILL)
 
 
-def clean_bounce(test):
+def clean_bounce(test, broker_type):
     """Chase the leader of one partition and restart it cleanly."""
     for i in range(5):
-        prev_leader_node = test.kafka.leader(topic=test.topic, partition=0)
-        test.kafka.restart_node(prev_leader_node, clean_shutdown=True)
+        prev_broker_node = broker_node(test, broker_type)
+        test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
 
 
-def hard_bounce(test):
+def hard_bounce(test, broker_type):
     """Chase the leader and restart it with a hard kill."""
     for i in range(5):
-        prev_leader_node = test.kafka.leader(topic=test.topic, partition=0)
-        test.kafka.signal_node(prev_leader_node, sig=signal.SIGKILL)
+        prev_broker_node = broker_node(test, broker_type)
+        test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
 
         # Since this is a hard kill, we need to make sure the process is down and that
-        # zookeeper and the broker cluster have registered the loss of the leader.
-        # Waiting for a new leader to be elected on the topic-partition is a reasonable heuristic
for this.
+        # zookeeper and the broker cluster have registered the loss of the leader/controller.
+        # Waiting for a new leader for the topic-partition/controller to be elected is a
reasonable heuristic for this.
 
-        def leader_changed():
-            current_leader = test.kafka.leader(topic=test.topic, partition=0)
-            return current_leader is not None and current_leader != prev_leader_node
+        def role_reassigned():
+            current_elected_broker = broker_node(test, broker_type)
+            return current_elected_broker is not None and current_elected_broker != prev_broker_node
 
-        wait_until(lambda: len(test.kafka.pids(prev_leader_node)) == 0, timeout_sec=5)
-        wait_until(leader_changed, timeout_sec=10, backoff_sec=.5)
-        test.kafka.start_node(prev_leader_node)
+        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, timeout_sec=5)
+        wait_until(role_reassigned, timeout_sec=10, backoff_sec=.5)
+        test.kafka.start_node(prev_broker_node)
 
 failures = {
     "clean_shutdown": clean_shutdown,
@@ -108,8 +122,12 @@ class ReplicationTest(ProduceConsumeValidateTest):
 
 
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+            broker_type=["leader"],
             security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
-    def test_replication_with_broker_failure(self, failure_mode, security_protocol):
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
+            broker_type=["controller"],
+            security_protocol=["PLAINTEXT", "SASL_SSL"])
+    def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type):
         """Replication tests.
         These tests verify that replication provides simple durability guarantees by checking
that data acked by
         brokers is still available for consumption in the face of various failure scenarios.
@@ -130,4 +148,4 @@ class ReplicationTest(ProduceConsumeValidateTest):
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
         self.kafka.start()
         
-        self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self))
+        self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self,
broker_type))


Mime
View raw message