kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch trunk updated: MINOR: Add async and different sync startup modes in connect service test class
Date Mon, 22 Jan 2018 23:00:44 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 83cc138  MINOR: Add async and different sync startup modes in connect service test
class
83cc138 is described below

commit 83cc138e0c04a2f30f4536c27314890d06818190
Author: Konstantine Karantasis <konstantine@confluent.io>
AuthorDate: Mon Jan 22 15:00:31 2018 -0800

    MINOR: Add async and different sync startup modes in connect service test class
    
    Allow Connect Service in system tests to start asynchronously.
    
    Specifically, allow for three startup conditions:
    1. No condition - start async and return immediately.
    2. Semi-async - start immediately after plugins have been discovered successfully.
    3. Sync - start returns after the worker has completed startup. This is the current mode,
but its condition is improved by checking that the port of Connect's REST interface is open,
rather than that a log line has appeared in the logs.
    
    An associated system test run has been started here:
    https://jenkins.confluent.io/job/system-test-confluent-platform-branch-builder/586/
    
    ewencp rhauch, I'd appreciate your review.
    
    Author: Konstantine Karantasis <konstantine@confluent.io>
    
    Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #4423 from kkonstantine/MINOR-Add-async-and-different-sync-startup-modes-in-ConnectService-test-class
---
 tests/kafkatest/services/connect.py | 72 ++++++++++++++++++++++++++++++-------
 1 file changed, 60 insertions(+), 12 deletions(-)

diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 399e53c..d7ef204 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -20,6 +20,7 @@ import signal
 import time
 
 import requests
+from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.errors import DucktapeError
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
@@ -39,6 +40,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     STDERR_FILE = os.path.join(PERSISTENT_ROOT, "connect.stderr")
     LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect-log4j.properties")
     PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid")
+    CONNECT_REST_PORT = 8083
+
+    # Currently the Connect worker supports waiting on three modes:
+    STARTUP_MODE_INSTANT = 'INSTANT'
+    """STARTUP_MODE_INSTANT: Start Connect worker and return immediately"""
+    STARTUP_MODE_LOAD = 'LOAD'
+    """STARTUP_MODE_LOAD: Start Connect worker and return after discovering and loading plugins"""
+    STARTUP_MODE_LISTEN = 'LISTEN'
+    """STARTUP_MODE_LISTEN: Start Connect worker and return after opening the REST port."""
 
     logs = {
         "connect_log": {
@@ -57,6 +67,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
         self.files = files
+        self.startup_mode = self.STARTUP_MODE_LISTEN
         self.environment = {}
 
     def pids(self, node):
@@ -76,6 +87,38 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         self.config_template_func = config_template_func
         self.connector_config_templates = connector_config_templates
 
+    def listening(self, node):
+        try:
+            cmd = "nc -z %s %s" % (node.account.hostname, self.CONNECT_REST_PORT)
+            node.account.ssh_output(cmd, allow_fail=False)
+            self.logger.debug("Connect worker started accepting connections at: '%s:%s')",
node.account.hostname,
+                              self.CONNECT_REST_PORT)
+            return True
+        except (RemoteCommandError, ValueError) as e:
+            return False
+
+    def start(self, mode=STARTUP_MODE_LISTEN):
+        self.startup_mode = mode
+        super(ConnectServiceBase, self).start()
+
+    def start_and_return_immediately(self, node, worker_type, remote_connector_configs):
+        cmd = self.start_cmd(node, remote_connector_configs)
+        self.logger.debug("Connect %s command: %s", worker_type, cmd)
+        node.account.ssh(cmd)
+
+    def start_and_wait_to_load_plugins(self, node, worker_type, remote_connector_configs):
+        with node.account.monitor_log(self.LOG_FILE) as monitor:
+            self.start_and_return_immediately(node, worker_type, remote_connector_configs)
+            monitor.wait_until('Kafka version', timeout_sec=60,
+                               err_msg="Never saw message indicating Kafka Connect finished
startup on node: " +
+                                       "%s in condition mode: %s" % (str(node.account), self.startup_mode))
+
+    def start_and_wait_to_start_listening(self, node, worker_type, remote_connector_configs):
+        self.start_and_return_immediately(node, worker_type, remote_connector_configs)
+        wait_until(lambda: self.listening(node), timeout_sec=60,
+                   err_msg="Kafka Connect failed to start on node: %s in condition mode:
%s" %
+                   (str(node.account), self.startup_mode))
+
     def stop_node(self, node, clean_shutdown=True):
         self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka
Connect on " + str(node.account))
         pids = self.pids(node)
@@ -192,7 +235,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         raise exception_to_throw
 
     def _base_url(self, node):
-        return 'http://' + node.account.externally_routable_ip + ':' + '8083'
+        return 'http://' + node.account.externally_routable_ip + ':' + str(self.CONNECT_REST_PORT)
 
 
 class ConnectStandaloneService(ConnectServiceBase):
@@ -229,11 +272,13 @@ class ConnectStandaloneService(ConnectServiceBase):
             remote_connector_configs.append(target_file)
 
         self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
-        with node.account.monitor_log(self.LOG_FILE) as monitor:
-            cmd = self.start_cmd(node, remote_connector_configs)
-            self.logger.debug("Connect standalone command: %s", cmd)
-            node.account.ssh(cmd)
-            monitor.wait_until('Kafka Connect started', timeout_sec=60, err_msg="Never saw
message indicating Kafka Connect finished startup on " + str(node.account))
+        if self.startup_mode == self.STARTUP_MODE_LOAD:
+            self.start_and_wait_to_load_plugins(node, 'standalone', remote_connector_configs)
+        elif self.startup_mode == self.STARTUP_MODE_INSTANT:
+            self.start_and_return_immediately(node, 'standalone', remote_connector_configs)
+        else:
+            # The default mode is to wait until the complete startup of the worker
+            self.start_and_wait_to_start_listening(node, 'standalone', remote_connector_configs)
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
@@ -249,7 +294,8 @@ class ConnectDistributedService(ConnectServiceBase):
         self.configs_topic = configs_topic
         self.status_topic = status_topic
 
-    def start_cmd(self, node):
+    # connector_configs argument is intentionally ignored in distributed service.
+    def start_cmd(self, node, connector_configs):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
         cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
         for envvar in self.environment:
@@ -268,11 +314,13 @@ class ConnectDistributedService(ConnectServiceBase):
             raise DucktapeError("Config files are not valid in distributed mode, submit connectors
via the REST API")
 
         self.logger.info("Starting Kafka Connect distributed process on " + str(node.account))
-        with node.account.monitor_log(self.LOG_FILE) as monitor:
-            cmd = self.start_cmd(node)
-            self.logger.debug("Connect distributed command: %s", cmd)
-            node.account.ssh(cmd)
-            monitor.wait_until('Kafka Connect started', timeout_sec=60, err_msg="Never saw
message indicating Kafka Connect finished startup on " + str(node.account))
+        if self.startup_mode == self.STARTUP_MODE_LOAD:
+            self.start_and_wait_to_load_plugins(node, 'distributed', '')
+        elif self.startup_mode == self.STARTUP_MODE_INSTANT:
+            self.start_and_return_immediately(node, 'distributed', '')
+        else:
+            # The default mode is to wait until the complete startup of the worker
+            self.start_and_wait_to_start_listening(node, 'distributed', '')
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.

Mime
View raw message