kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: refactor streams system test class hierachy
Date Wed, 18 Jan 2017 19:55:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 638a82b7b -> dbca4a3b6


MINOR: refactor streams system test class hierachy

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2392 from mjsax/minor-system-test-rework

(cherry picked from commit d8a77560c2fa2c209353e3ba2366ad3d4cfdf22c)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dbca4a3b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dbca4a3b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dbca4a3b

Branch: refs/heads/0.10.2
Commit: dbca4a3b60d6039558c6fec99c86400c2131f9e8
Parents: 638a82b
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Jan 18 11:55:23 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jan 18 11:55:34 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/perf/SimpleBenchmark.java     |   2 +-
 .../services/performance/streams_performance.py | 112 ++-----------------
 tests/kafkatest/services/streams.py             |  54 +++++----
 .../streams/streams_shutdown_deadlock_test.py   |   3 +-
 4 files changed, 43 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dbca4a3b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7ba6161..db3752d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -111,7 +111,7 @@ public class SimpleBenchmark {
         rocksdbDir.mkdir();
 
         // Note: this output is needed for automated tests and must not be removed
-        System.out.println("SimpleBenchmark instance started");
+        System.out.println("StreamsTest instance started");
         System.out.println("kafka=" + kafka);
         System.out.println("zookeeper=" + zookeeper);
         System.out.println("stateDir=" + stateDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbca4a3b/tests/kafkatest/services/performance/streams_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index 0af13f9..e9fa2a7 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -13,115 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import os.path
-import signal
+from kafkatest.services.streams import StreamsTestBaseService
 
-from ducktape.services.service import Service
-from ducktape.utils.util import wait_until
-
-from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
 #
 # Class used to start the simple Kafka Streams benchmark
 #
-class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
+class StreamsSimpleBenchmarkService(StreamsTestBaseService):
     """Base class for simple Kafka Streams benchmark"""
 
-    PERSISTENT_ROOT = "/mnt/streams"
-    # The log file contains normal log4j logs written using a file appender. stdout and stderr
are handled separately
-    LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
-    STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
-    STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
-    LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
-    PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
-
-    logs = {
-        "streams_log": {
-            "path": LOG_FILE,
-            "collect_default": True},
-        "streams_stdout": {
-            "path": STDOUT_FILE,
-            "collect_default": True},
-        "streams_stderr": {
-            "path": STDERR_FILE,
-            "collect_default": True},
-    }
-
-    def __init__(self, context, kafka, numrecs):
-        super(StreamsSimpleBenchmarkService, self).__init__(context, 1)
-        self.kafka = kafka
-        self.numrecs = numrecs
-
-    @property
-    def node(self):
-        return self.nodes[0]
-
-    def pids(self, node):
-        try:
-            return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
-        except:
-            return []
-
-    def stop_node(self, node, clean_shutdown=True):
-        self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping SimpleBenchmark
on " + str(node.account))
-        pids = self.pids(node)
-        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
-
-        for pid in pids:
-            node.account.signal(pid, sig, allow_fail=True)
-        if clean_shutdown:
-            for pid in pids:
-                wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="SimpleBenchmark
process on " + str(node.account) + " took too long to exit")
-
-        node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
-
-    def wait(self):
-        for node in self.nodes:
-            for pid in self.pids(node):
-                wait_until(lambda: not node.account.alive(pid), timeout_sec=600, backoff_sec=1,
err_msg="SimpleBenchmark process on " + str(node.account) + " took too long to exit")
-
-    def clean_node(self, node):
-        node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
-
-    def start_cmd(self, node):
-        args = {}
-        args['kafka'] = self.kafka.bootstrap_servers()
-        args['zk'] = self.kafka.zk.connect_setting()
-        args['state_dir'] = self.PERSISTENT_ROOT
-        args['numrecs'] = self.numrecs
-        args['stdout'] = self.STDOUT_FILE
-        args['stderr'] = self.STDERR_FILE
-        args['pidfile'] = self.PID_FILE
-        args['log4j'] = self.LOG4J_CONFIG_FILE
-        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
-
-        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
-              "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.perf.SimpleBenchmark
" \
-              " %(kafka)s %(zk)s %(state_dir)s %(numrecs)s " \
-              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3>
%(pidfile)s" % args
-
-        return cmd
-
-    def start_node(self, node):
-        node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
-
-        node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties',
log_file=self.LOG_FILE))
-
-        self.logger.info("Starting SimpleBenchmark process on " + str(node.account))
-        results = {}
-        with node.account.monitor_log(self.STDOUT_FILE) as monitor:
-            node.account.ssh(self.start_cmd(node))
-            monitor.wait_until('SimpleBenchmark instance started', timeout_sec=15, err_msg="Never
saw message indicating SimpleBenchmark finished startup on " + str(node.account))
-
-        if len(self.pids(node)) == 0:
-            raise RuntimeError("No process ids recorded")
-
-    def collect_data(self, node):
-        # Collect the data and return it to the framework
-        output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
-        data = {}
-        for line in output:
-            parts = line.split(':')
-            data[parts[0]] = float(parts[1])
-        return data
+    def __init__(self, test_context, kafka, numrecs):
+        super(StreamsSimpleBenchmarkService, self).__init__(test_context,
+                                                            kafka,
+                                                            "org.apache.kafka.streams.perf.SimpleBenchmark",
+                                                            numrecs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbca4a3b/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 87e4414..9250cd7 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -22,8 +22,8 @@ from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
 
-class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
-    """Base class for Streams Smoke Test services providing some common settings and functionality"""
+class StreamsTestBaseService(KafkaPathResolverMixin, Service):
+    """Base class for Streams Test services providing some common settings and functionality"""
 
     PERSISTENT_ROOT = "/mnt/streams"
     # The log file contains normal log4j logs written using a file appender. stdout and stderr
are handled separately
@@ -45,10 +45,11 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
             "collect_default": True},
     }
 
-    def __init__(self, context, kafka, command):
-        super(StreamsSmokeTestBaseService, self).__init__(context, 1)
+    def __init__(self, test_context, kafka, streams_class_name, user_test_args):
+        super(StreamsTestBaseService, self).__init__(test_context, 1)
         self.kafka = kafka
-        self.args = {'command': command}
+        self.args = {'streams_class_name': streams_class_name,
+                     'user_test_args': user_test_args}
 
     @property
     def node(self):
@@ -65,7 +66,7 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
             self.stop_node(node, clean_shutdown)
 
     def stop_node(self, node, clean_shutdown=True):
-        self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams
Smoke Test on " + str(node.account))
+        self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams
Test on " + str(node.account))
         pids = self.pids(node)
         sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
 
@@ -73,7 +74,7 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
             node.account.signal(pid, sig, allow_fail=True)
         if clean_shutdown:
             for pid in pids:
-                wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Streams
Smoke Test process on " + str(node.account) + " took too long to exit")
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Streams
Test process on " + str(node.account) + " took too long to exit")
 
         node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
 
@@ -95,8 +96,11 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
 
     def wait(self, timeout_sec=360):
         for node in self.nodes:
-            for pid in self.pids(node):
-                wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec,
err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit")
+            self.wait_node(node, timeout_sec)
+
+    def wait_node(self, node, timeout_sec=None):
+        for pid in self.pids(node):
+            wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, err_msg="Streams
Test process on " + str(node.account) + " took too long to exit")
 
     def clean_node(self, node):
         node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
@@ -105,7 +109,6 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
     def start_cmd(self, node):
         args = self.args.copy()
         args['kafka'] = self.kafka.bootstrap_servers()
-        args['zk'] = self.kafka.zk.connect_setting()
         args['state_dir'] = self.PERSISTENT_ROOT
         args['stdout'] = self.STDOUT_FILE
         args['stderr'] = self.STDERR_FILE
@@ -114,8 +117,8 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
         args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
 
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
-              "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.smoketest.StreamsSmokeTest
" \
-              " %(command)s %(kafka)s %(zk)s %(state_dir)s " \
+              "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
+              " %(kafka)s %(state_dir)s %(user_test_args)s" \
               " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3>
%(pidfile)s" % args
 
         return cmd
@@ -125,24 +128,35 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
 
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties',
log_file=self.LOG_FILE))
 
-        self.logger.info("Starting StreamsSmokeTest process on " + str(node.account))
+        self.logger.info("Starting StreamsTest process on " + str(node.account))
         with node.account.monitor_log(self.STDOUT_FILE) as monitor:
             node.account.ssh(self.start_cmd(node))
-            monitor.wait_until('StreamsSmokeTest instance started', timeout_sec=15, err_msg="Never
saw message indicating StreamsSmokeTest finished startup on " + str(node.account))
+            monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never
saw message indicating StreamsTest finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
 
 
+class StreamsSmokeTestBaseService(StreamsTestBaseService):
+    """Base class for Streams Smoke Test services providing some common settings and functionality"""
+
+    def __init__(self, test_context, kafka, command):
+        super(StreamsSmokeTestBaseService, self).__init__(test_context,
+                                                          kafka,
+                                                          "org.apache.kafka.streams.smoketest.StreamsSmokeTest",
+                                                          command)
+
+
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
-    def __init__(self, context, kafka):
-        super(StreamsSmokeTestDriverService, self).__init__(context, kafka, "run")
+    def __init__(self, test_context, kafka):
+        super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run")
 
 
 class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
-    def __init__(self, context, kafka):
-        super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process")
+    def __init__(self, test_context, kafka):
+        super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process")
+
 
 class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
-    def __init__(self, context, kafka):
-        super(StreamsSmokeTestShutdownDeadlockService, self).__init__(context, kafka, "close-deadlock-test")
+    def __init__(self, test_context, kafka):
+        super(StreamsSmokeTestShutdownDeadlockService, self).__init__(test_context, kafka,
"close-deadlock-test")

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbca4a3b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
index 5e4e7f2..482da9c 100644
--- a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
+++ b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
@@ -13,11 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import ignore
-
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.streams import StreamsSmokeTestShutdownDeadlockService
 
+
 class StreamsShutdownDeadlockTest(KafkaTest):
     """
     Simple test of Kafka Streams.


Mime
View raw message