kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [2/2] kafka git commit: KAFKA-3592: System test - configurable paths
Date Fri, 06 May 2016 18:12:57 GMT
KAFKA-3592: System test - configurable paths

This patch adds logic for the following:
- remove hard-coded paths to various scripts and jars in kafkatest service classes
- provide a mechanism for overriding path resolution logic with a "pluggable" path resolver class

Author: Geoff Anderson <geoff@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1245 from granders/configurable-install-path

(cherry picked from commit 54092c12ed276b4bf91741e3c7fd315443f3c0b1)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aabf8251
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aabf8251
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aabf8251

Branch: refs/heads/0.10.0
Commit: aabf825145645828075f341eba1483384df18f26
Parents: cda571d
Author: Geoff Anderson <geoff@confluent.io>
Authored: Fri May 6 11:10:27 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri May 6 11:11:51 2016 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 tests/README.md                                 |  15 ++
 .../kafkatest/benchmarks/core/benchmark_test.py |   9 +-
 tests/kafkatest/directory_layout/__init__.py    |  14 ++
 tests/kafkatest/directory_layout/kafka_path.py  | 137 +++++++++++++++++++
 .../sanity_checks/test_console_consumer.py      |  18 ++-
 .../sanity_checks/test_kafka_version.py         |   4 +-
 .../sanity_checks/test_performance_services.py  |   6 +-
 .../sanity_checks/test_verifiable_producer.py   |   6 +-
 tests/kafkatest/services/connect.py             |  18 ++-
 tests/kafkatest/services/console_consumer.py    |  19 ++-
 tests/kafkatest/services/kafka/directory.py     |  32 -----
 tests/kafkatest/services/kafka/kafka.py         |  74 +++++-----
 tests/kafkatest/services/kafka/version.py       |  69 ----------
 .../kafkatest/services/kafka_log4j_appender.py  |   8 +-
 tests/kafkatest/services/mirror_maker.py        |  13 +-
 tests/kafkatest/services/monitor/jmx.py         |   9 +-
 .../performance/consumer_performance.py         |  10 +-
 .../services/performance/end_to_end_latency.py  |  15 +-
 .../services/performance/performance.py         |   3 +-
 .../performance/producer_performance.py         |  21 +--
 .../services/replica_verification_tool.py       |   8 +-
 tests/kafkatest/services/security/kafka_acls.py |  10 +-
 tests/kafkatest/services/security/minikdc.py    |  37 ++---
 .../kafkatest/services/simple_consumer_shell.py |   8 +-
 tests/kafkatest/services/streams.py             |  13 +-
 tests/kafkatest/services/verifiable_consumer.py |  18 +--
 tests/kafkatest/services/verifiable_producer.py |  29 ++--
 tests/kafkatest/services/zookeeper.py           |  25 ++--
 .../tests/client/message_format_change_test.py  |  12 +-
 .../core/compatibility_test_new_broker_test.py  |  13 +-
 .../tests/core/security_rolling_upgrade_test.py |   3 +-
 tests/kafkatest/tests/core/upgrade_test.py      |   9 +-
 .../core/zookeeper_security_upgrade_test.py     |   4 +-
 tests/kafkatest/version.py                      |  80 +++++++++++
 tests/setup.cfg                                 |  30 ++++
 tests/setup.py                                  |  29 +++-
 tests/unit/__init__.py                          |  14 ++
 tests/unit/directory_layout/__init__.py         |  14 ++
 .../directory_layout/check_project_paths.py     |  90 ++++++++++++
 tests/unit/setup.cfg                            |  23 ++++
 tests/unit/version/__init__.py                  |  15 ++
 tests/unit/version/check_version.py             |  33 +++++
 43 files changed, 712 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 50e1f85..73972e6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -37,5 +37,6 @@ results
 tests/results
 .ducktape
 tests/.ducktape
+.cache
 
 docs/generated/

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/README.md
----------------------------------------------------------------------
diff --git a/tests/README.md b/tests/README.md
index 143711d..8a779a4 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -134,3 +134,18 @@ the test driver machine.
 
 * To halt your workers without destroying persistent state, run `vagrant halt`. Run `vagrant destroy -f` to destroy all traces of your workers.
 
+Unit Tests
+----------
+The system tests have unit tests! The various services in the python `kafkatest` module are reasonably complex, and intended to be reusable. Hence we have unit tests
+for the system service classes.
+
+Where are the unit tests?
+* The kafkatest unit tests are located under kafka/tests/unit
+
+How do I run the unit tests?
+* cd kafka/tests # The base system test directory
+* python setup.py test
+
+How can I add a unit test?
+* Follow the naming conventions - module name starts with "check", class name begins with "Check", test method name begins with "check"
+* These naming conventions are defined in "setup.cfg". We use "check" to distinguish unit tests from system tests, which use "test" in the various names.

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/benchmarks/core/benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 83f4b2a..4dbf902 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -13,16 +13,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark import matrix
+from ducktape.mark import parametrize
 from ducktape.services.service import Service
 from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-from ducktape.mark import matrix
 
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import TRUNK, KafkaVersion
 from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, throughput, latency, compute_aggregate_throughput
-
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import TRUNK, KafkaVersion
 
 TOPIC_REP_ONE = "topic-replication-factor-one"
 TOPIC_REP_THREE = "topic-replication-factor-three"

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/directory_layout/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/directory_layout/__init__.py b/tests/kafkatest/directory_layout/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/kafkatest/directory_layout/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/directory_layout/kafka_path.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/directory_layout/kafka_path.py b/tests/kafkatest/directory_layout/kafka_path.py
new file mode 100644
index 0000000..0e60aff
--- /dev/null
+++ b/tests/kafkatest/directory_layout/kafka_path.py
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import importlib
+import os
+
+from kafkatest.version import get_version, KafkaVersion, TRUNK
+
+
+"""This module serves a few purposes:
+
+First, it gathers information about path layout in a single place, and second, it
+makes the layout of the Kafka installation pluggable, so that users are not forced
+to use the layout assumed in the KafkaPathResolver class.
+
+To run system tests using your own path resolver, use for example:
+
+ducktape <TEST_PATH> --globals '{"kafka-path-resolver": "my.path.resolver.CustomResolverClass"}'
+"""
+
+SCRATCH_ROOT = "/mnt"
+KAFKA_INSTALL_ROOT = "/opt"
+KAFKA_PATH_RESOLVER_KEY = "kafka-path-resolver"
+KAFKA_PATH_RESOLVER = "kafkatest.directory_layout.kafka_path.KafkaSystemTestPathResolver"
+
+# Variables for jar path resolution
+CORE_JAR_NAME = "core"
+CORE_LIBS_JAR_NAME = "core-libs"
+CORE_DEPENDANT_TEST_LIBS_JAR_NAME = "core-dependant-testlibs"
+TOOLS_JAR_NAME = "tools"
+TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME = "tools-dependant-libs"
+
+JARS = {
+    "trunk": {
+        CORE_JAR_NAME: "core/build/*/*.jar",
+        CORE_LIBS_JAR_NAME: "core/build/libs/*.jar",
+        CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "core/build/dependant-testlibs/*.jar",
+        TOOLS_JAR_NAME: "tools/build/libs/kafka-tools*.jar",
+        TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "tools/build/dependant-libs*/*.jar"
+    }
+}
+
+
+def create_path_resolver(context, project="kafka"):
+    """Factory for generating a path resolver class
+
+    This will first check for a fully qualified path resolver classname in context.globals.
+
+    If present, construct a new instance, else default to KafkaSystemTestPathResolver
+    """
+    assert project is not None
+
+    if KAFKA_PATH_RESOLVER_KEY in context.globals:
+        resolver_fully_qualified_classname = context.globals[KAFKA_PATH_RESOLVER_KEY]
+    else:
+        resolver_fully_qualified_classname = KAFKA_PATH_RESOLVER
+
+    # Using the fully qualified classname, import the resolver class
+    (module_name, resolver_class_name) = resolver_fully_qualified_classname.rsplit('.', 1)
+    cluster_mod = importlib.import_module(module_name)
+    path_resolver_class = getattr(cluster_mod, resolver_class_name)
+    path_resolver = path_resolver_class(context, project)
+
+    return path_resolver
+
+
+class KafkaPathResolverMixin(object):
+    """Mixin to automatically provide pluggable path resolution functionality to any class using it.
+
+    Keep life simple, and don't add a constructor to this class:
+    Since use of a mixin entails multiple inheritence, it is *much* simpler to reason about the interaction of this
+    class with subclasses if we don't have to worry about method resolution order, constructor signatures etc.
+    """
+
+    @property
+    def path(self):
+        if not hasattr(self, "_path"):
+            setattr(self, "_path", create_path_resolver(self.context, "kafka"))
+            if hasattr(self.context, "logger") and self.context.logger is not None:
+                self.context.logger.debug("Using path resolver %s" % self._path.__class__.__name__)
+
+        return self._path
+
+
+class KafkaSystemTestPathResolver(object):
+    """Path resolver for Kafka system tests which assumes the following layout:
+
+        /opt/kafka-trunk        # Current version of kafka under test
+        /opt/kafka-0.9.0.1      # Example of an older version of kafka installed from tarball
+        /opt/kafka-<version>    # Other previous versions of kafka
+        ...
+    """
+    def __init__(self, context, project="kafka"):
+        self.context = context
+        self.project = project
+
+    def home(self, node_or_version=TRUNK):
+        version = self._version(node_or_version)
+        home_dir = self.project
+        if version is not None:
+            home_dir += "-%s" % str(version)
+
+        return os.path.join(KAFKA_INSTALL_ROOT, home_dir)
+
+    def bin(self, node_or_version=TRUNK):
+        version = self._version(node_or_version)
+        return os.path.join(self.home(version), "bin")
+
+    def script(self, script_name, node_or_version=TRUNK):
+        version = self._version(node_or_version)
+        return os.path.join(self.bin(version), script_name)
+
+    def jar(self, jar_name, node_or_version=TRUNK):
+        version = self._version(node_or_version)
+        return os.path.join(self.home(version), JARS[str(version)][jar_name])
+
+    def scratch_space(self, service_instance):
+        return os.path.join(SCRATCH_ROOT, service_instance.service_id)
+
+    def _version(self, node_or_version):
+        if isinstance(node_or_version, KafkaVersion):
+            return node_or_version
+        else:
+            return get_version(node_or_version)
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index d6a152a..773a561 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -13,21 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import time
+
+from ducktape.mark import matrix
+from ducktape.mark import parametrize
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize
-from ducktape.mark import matrix
 
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_8_2
 from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils.remote_account import line_count, file_exists
+from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.security.security_config import SecurityConfig
-
-
-import time
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.utils.remote_account import line_count, file_exists
+from kafkatest.version import LATEST_0_8_2
 
 
 class ConsoleConsumerTest(Test):

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/sanity_checks/test_kafka_version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_kafka_version.py b/tests/kafkatest/sanity_checks/test_kafka_version.py
index f5f5d5f..b33c590 100644
--- a/tests/kafkatest/sanity_checks/test_kafka_version.py
+++ b/tests/kafkatest/sanity_checks/test_kafka_version.py
@@ -15,10 +15,10 @@
 
 from ducktape.tests.test import Test
 
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService, config_property
-from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
+from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.utils import is_version
+from kafkatest.version import LATEST_0_8_2, TRUNK
 
 
 class KafkaVersionTest(Test):

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/sanity_checks/test_performance_services.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py
index 16d5d32..94a61bc 100644
--- a/tests/kafkatest/sanity_checks/test_performance_services.py
+++ b/tests/kafkatest/sanity_checks/test_performance_services.py
@@ -13,14 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.tests.test import Test
 from ducktape.mark import parametrize
+from ducktape.tests.test import Test
 
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2, LATEST_0_9, KafkaVersion
 from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
 from kafkatest.services.performance import latency, compute_aggregate_throughput
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import TRUNK, LATEST_0_8_2, LATEST_0_9, KafkaVersion
 
 
 class PerformanceServiceTest(Test):

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/sanity_checks/test_verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index f1bc2a0..23932f3 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -14,15 +14,15 @@
 # limitations under the License.
 
 
+from ducktape.mark import parametrize
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize
 
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
 from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.utils import is_version
+from kafkatest.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
 
 
 class TestVerifiableProducer(Test):

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 51dade3..1eb2dd5 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -13,14 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import json
+import os.path
+import random
+import signal
+
+import requests
+from ducktape.errors import DucktapeError
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
-from ducktape.errors import DucktapeError
 
-from kafkatest.services.kafka.directory import kafka_dir
-import signal, random, requests, os.path, json
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+
 
-class ConnectServiceBase(Service):
+class ConnectServiceBase(KafkaPathResolverMixin, Service):
     """Base class for Kafka Connect services providing some common settings and functionality"""
 
     PERSISTENT_ROOT = "/mnt/connect"
@@ -156,7 +162,7 @@ class ConnectStandaloneService(ConnectServiceBase):
     def start_cmd(self, node, connector_configs):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
         cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
-        cmd += "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
+        cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), self.CONFIG_FILE)
         cmd += " ".join(connector_configs)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
         return cmd
@@ -195,7 +201,7 @@ class ConnectDistributedService(ConnectServiceBase):
     def start_cmd(self, node):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
         cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
-        cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
+        cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
         return cmd
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 9c7f564..2bd093c 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -13,17 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.utils.util import wait_until
-from ducktape.services.background_thread import BackgroundThreadService
-
-from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2, LATEST_0_9, V_0_10_0_0
-from kafkatest.services.monitor.jmx import JmxMixin
-
 import itertools
 import os
 import subprocess
 
+from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.utils.util import wait_until
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.version import TRUNK, LATEST_0_8_2, LATEST_0_9, V_0_10_0_0
 
 """
 0.8.2.1 ConsoleConsumer options
@@ -66,7 +65,7 @@ Option                                  Description
 """
 
 
-class ConsoleConsumer(JmxMixin, BackgroundThreadService):
+class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService):
     # Root directory for persistent output
     PERSISTENT_ROOT = "/mnt/console_consumer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
@@ -165,7 +164,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         args['config_file'] = ConsoleConsumer.CONFIG_FILE
         args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['jmx_port'] = self.jmx_port
-        args['kafka_dir'] = kafka_dir(node)
+        args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node)
         args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
         args['kafka_opts'] = self.security_config.kafka_opts
 
@@ -173,7 +172,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
               "export LOG_DIR=%(log_dir)s; " \
               "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \
               "export KAFKA_OPTS=%(kafka_opts)s; " \
-              "/opt/%(kafka_dir)s/bin/kafka-console-consumer.sh " \
+              "%(console_consumer)s " \
               "--topic %(topic)s --consumer.config %(config_file)s" % args
 
         if self.new_consumer:

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/kafka/directory.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/directory.py b/tests/kafkatest/services/kafka/directory.py
deleted file mode 100644
index 59af1fc..0000000
--- a/tests/kafkatest/services/kafka/directory.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-# "trunk" installation of kafka
-KAFKA_TRUNK = "kafka-trunk"
-
-
-def kafka_dir(node=None):
-    """Return name of kafka directory for the given node.
-
-    This provides a convenient way to support different versions of kafka or kafka tools running
-    on different nodes.
-    """
-    if node is None:
-        return KAFKA_TRUNK
-
-    if not hasattr(node, "version"):
-        return KAFKA_TRUNK
-
-    return "kafka-" + str(node.version)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index a74bb00..6ff7d0c 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -13,28 +13,29 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import collections
+import json
+import os.path
+import re
+import signal
+import subprocess
+import time
+
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 
 from config import KafkaConfig
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.kafka import config_property
-from kafkatest.services.kafka.version import TRUNK
-from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
-
 from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.security.minikdc import MiniKdc
-import json
-import re
-import signal
-import subprocess
-import time
-import os.path
-import collections
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.version import TRUNK
 
 Port = collections.namedtuple('Port', ['name', 'number', 'open'])
 
-class KafkaService(JmxMixin, Service):
+
+class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
     PERSISTENT_ROOT = "/mnt"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "kafka.log")
@@ -84,6 +85,7 @@ class KafkaService(JmxMixin, Service):
         self.topics = topics
         self.minikdc = None
         self.authorizer_class_name = authorizer_class_name
+
         #
         # In a heavily loaded and not very fast machine, it is
         # sometimes necessary to give more time for the zk client
@@ -174,7 +176,11 @@ class KafkaService(JmxMixin, Service):
         cmd = "export JMX_PORT=%d; " % self.jmx_port
         cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG
         cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
-        cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh %s 1>> %s 2>> %s &" % (KafkaService.CONFIG_FILE, KafkaService.STDOUT_CAPTURE, KafkaService.STDERR_CAPTURE)
+        cmd += "%s %s 1>> %s 2>> %s &" % \
+               (self.path.script("kafka-server-start.sh", node),
+                KafkaService.CONFIG_FILE,
+                KafkaService.STDOUT_CAPTURE,
+                KafkaService.STDERR_CAPTURE)
         return cmd
 
     def start_node(self, node):
@@ -239,8 +245,9 @@ class KafkaService(JmxMixin, Service):
         if node is None:
             node = self.nodes[0]
         self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
+        kafka_topic_script = self.path.script("kafka-topics.sh", node)
 
-        cmd = "/opt/%s/bin/kafka-topics.sh " % kafka_dir(node)
+        cmd = kafka_topic_script + " "
         cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
                 'zk_connect': self.zk.connect_setting(),
                 'topic': topic_cfg.get("topic"),
@@ -263,8 +270,8 @@ class KafkaService(JmxMixin, Service):
     def describe_topic(self, topic, node=None):
         if node is None:
             node = self.nodes[0]
-        cmd = "/opt/%s/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
-              (kafka_dir(node), self.zk.connect_setting(), topic)
+        cmd = "%s --zookeeper %s --topic %s --describe" % \
+              (self.path.script("kafka-topics.sh", node), self.zk.connect_setting(), topic)
         output = ""
         for line in node.account.ssh_capture(cmd):
             output += line
@@ -274,8 +281,8 @@ class KafkaService(JmxMixin, Service):
         if node is None:
             node = self.nodes[0]
         self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version)
-        cmd = "/opt/%s/bin/kafka-configs.sh --zookeeper %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
-              (kafka_dir(node), self.zk.connect_setting(), topic, msg_format_version)
+        cmd = "%s --zookeeper %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
+              (self.path.script("kafka-configs.sh", node), self.zk.connect_setting(), topic, msg_format_version)
         self.logger.info("Running alter message format command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -322,7 +329,7 @@ class KafkaService(JmxMixin, Service):
 
         # create command
         cmd = "echo %s > %s && " % (json_str, json_file)
-        cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node)
+        cmd += "%s " % self.path.script("kafka-reassign-partitions.sh", node)
         cmd += "--zookeeper %s " % self.zk.connect_setting()
         cmd += "--reassignment-json-file %s " % json_file
         cmd += "--verify "
@@ -355,7 +362,7 @@ class KafkaService(JmxMixin, Service):
 
         # create command
         cmd = "echo %s > %s && " % (json_str, json_file)
-        cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node)
+        cmd += "%s " % self.path.script( "kafka-reassign-partitions.sh", node)
         cmd += "--zookeeper %s " % self.zk.connect_setting()
         cmd += "--reassignment-json-file %s " % json_file
         cmd += "--execute"
@@ -386,8 +393,8 @@ class KafkaService(JmxMixin, Service):
 
             # Check each data file to see if it contains the messages we want
             for log in files:
-                cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files %s " \
-                      "| grep -E \"%s\"" % (kafka_dir(node), log.strip(), payload_match)
+                cmd = "%s kafka.tools.DumpLogSegments --print-data-log --files %s | grep -E \"%s\"" % \
+                      (self.path.script("kafka-run-class.sh", node), log.strip(), payload_match)
 
                 for line in node.account.ssh_capture(cmd, allow_fail=True):
                     for val in messages:
@@ -429,6 +436,7 @@ class KafkaService(JmxMixin, Service):
         """
         if node is None:
             node = self.nodes[0]
+        consumer_group_script = self.path.script("kafka-consumer-groups.sh", node)
 
         if command_config is None:
             command_config = ""
@@ -436,11 +444,12 @@ class KafkaService(JmxMixin, Service):
             command_config = "--command-config " + command_config
 
         if new_consumer:
-            cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --list" % \
-                  (kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config)
+            cmd = "%s --new-consumer --bootstrap-server %s %s --list" % \
+                  (consumer_group_script,
+                   self.bootstrap_servers(self.security_protocol),
+                   command_config)
         else:
-            cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --list" % \
-                  (kafka_dir(node), self.zk.connect_setting(), command_config)
+            cmd = "%s --zookeeper %s %s --list" % (consumer_group_script, self.zk.connect_setting(), command_config)
         output = ""
         self.logger.debug(cmd)
         for line in node.account.ssh_capture(cmd):
@@ -454,6 +463,7 @@ class KafkaService(JmxMixin, Service):
         """
         if node is None:
             node = self.nodes[0]
+        consumer_group_script = self.path.script("kafka-consumer-groups.sh", node)
 
         if command_config is None:
             command_config = ""
@@ -461,11 +471,11 @@ class KafkaService(JmxMixin, Service):
             command_config = "--command-config " + command_config
 
         if new_consumer:
-            cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --group %s --describe" % \
-                  (kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config, group)
+            cmd = "%s --new-consumer --bootstrap-server %s %s --group %s --describe" % \
+                  (consumer_group_script, self.bootstrap_servers(self.security_protocol), command_config, group)
         else:
-            cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --group %s --describe" % \
-                  (kafka_dir(node), self.zk.connect_setting(), command_config, group)
+            cmd = "%s --zookeeper %s %s --group %s --describe" % \
+                  (consumer_group_script, self.zk.connect_setting(), command_config, group)
         output = ""
         self.logger.debug(cmd)
         for line in node.account.ssh_capture(cmd):
@@ -506,8 +516,8 @@ class KafkaService(JmxMixin, Service):
     def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time):
         node = self.nodes[0]
 
-        cmd = "/opt/%s/bin/" % kafka_dir(node)
-        cmd += "kafka-run-class.sh kafka.tools.GetOffsetShell"
+        cmd = self.path.script("kafka-run-class.sh", node)
+        cmd += " kafka.tools.GetOffsetShell"
         cmd += " --topic %s --broker-list %s --max-wait-ms %s --offsets %s --time %s" % (topic, self.bootstrap_servers(self.security_protocol), max_wait_ms, offsets, time)
 
         if partitions:

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/kafka/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/version.py b/tests/kafkatest/services/kafka/version.py
deleted file mode 100644
index dc2582b..0000000
--- a/tests/kafkatest/services/kafka/version.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from kafkatest.utils import kafkatest_version
-
-from distutils.version import LooseVersion
-
-
-class KafkaVersion(LooseVersion):
-    """Container for kafka versions which makes versions simple to compare.
-
-    distutils.version.LooseVersion (and StrictVersion) has robust comparison and ordering logic.
-
-    Example:
-
-        v10 = KafkaVersion("0.10.0")
-        v9 = KafkaVersion("0.9.0.1")
-        assert v10 > v9  # assertion passes!
-    """
-    def __init__(self, version_string):
-        self.is_trunk = (version_string.lower() == "trunk")
-        if self.is_trunk:
-            # Since "trunk" may actually be a branch that is not trunk,
-            # use kafkatest_version() for comparison purposes,
-            # and track whether we're in "trunk" with a flag
-            version_string = kafkatest_version()
-
-            # Drop dev suffix if present
-            dev_suffix_index = version_string.find(".dev")
-            if dev_suffix_index >= 0:
-                version_string = version_string[:dev_suffix_index]
-
-        # Don't use the form super.(...).__init__(...) because
-        # LooseVersion is an "old style" python class
-        LooseVersion.__init__(self, version_string)
-
-    def __str__(self):
-        if self.is_trunk:
-            return "trunk"
-        else:
-            return LooseVersion.__str__(self)
-
-
-TRUNK = KafkaVersion("trunk")
-
-# 0.8.2.X versions
-V_0_8_2_1 = KafkaVersion("0.8.2.1")
-V_0_8_2_2 = KafkaVersion("0.8.2.2")
-LATEST_0_8_2 = V_0_8_2_2
-
-# 0.9.0.X versions
-V_0_9_0_0 = KafkaVersion("0.9.0.0")
-V_0_9_0_1 = KafkaVersion("0.9.0.1")
-LATEST_0_9 = V_0_9_0_1
-
-# 0.10.0.X versions
-V_0_10_0_0 = KafkaVersion("0.10.0.0")
-LATEST_0_10 = V_0_10_0_0

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index c0af1a1..c50cab4 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -15,11 +15,11 @@
 
 from ducktape.services.background_thread import BackgroundThreadService
 
-from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.security.security_config import SecurityConfig
 
 
-class KafkaLog4jAppender(BackgroundThreadService):
+class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
 
     logs = {
         "producer_log": {
@@ -43,8 +43,8 @@ class KafkaLog4jAppender(BackgroundThreadService):
         node.account.ssh(cmd)
 
     def start_cmd(self, node):
-        cmd = "/opt/%s/bin/" % kafka_dir(node)
-        cmd += "kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender"
+        cmd = self.path.script("kafka-run-class.sh", node)
+        cmd += " org.apache.kafka.tools.VerifiableLog4jAppender"
         cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol))
 
         if self.max_messages > 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index cb4b2c1..fdaf4c8 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -13,14 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os
+import subprocess
+
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 
-from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.security.security_config import SecurityConfig
-
-import os
-import subprocess
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
 """
 0.8.2.1 MirrorMaker options
@@ -56,7 +55,7 @@ Option                                  Description
 """
 
 
-class MirrorMaker(Service):
+class MirrorMaker(KafkaPathResolverMixin, Service):
 
     # Root directory for persistent output
     PERSISTENT_ROOT = "/mnt/mirror_maker"
@@ -114,7 +113,7 @@ class MirrorMaker(Service):
         cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
-        cmd += " /opt/%s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % kafka_dir(node)
+        cmd += " %s kafka.tools.MirrorMaker" % self.path.script("kafka-run-class.sh", node)
         cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
         cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
         cmd += " --offset.commit.interval.ms %s" % str(self.offset_commit_interval_ms)

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/monitor/jmx.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py
index 06c7dc8..ea407b0 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -13,12 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.services.kafka.directory import kafka_dir
 
 class JmxMixin(object):
     """This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
 
-    Note that this is not a service in its own right.
+    A couple things worth noting:
+    - this is not a service in its own right.
+    - we assume the service using JmxMixin also uses KafkaPathResolverMixin
     """
     def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]):
         self.jmx_object_names = jmx_object_names
@@ -38,8 +39,8 @@ class JmxMixin(object):
         if self.started[idx-1] or self.jmx_object_names is None:
             return
 
-        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.JmxTool " \
-              "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % (kafka_dir(node), self.jmx_port)
+        cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", node)
+        cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
         for jmx_object_name in self.jmx_object_names:
             cmd += " --object-name %s" % jmx_object_name
         for jmx_attribute in self.jmx_attributes:

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index def27b1..b0f99d7 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -13,13 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.services.performance import PerformanceService
-from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
 
 import os
 
+from kafkatest.services.performance import PerformanceService
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.version import TRUNK, V_0_9_0_0
+
 
 class ConsumerPerformanceService(PerformanceService):
     """
@@ -135,7 +135,7 @@ class ConsumerPerformanceService(PerformanceService):
         cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG
-        cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node)
+        cmd += " %s" % self.path.script("kafka-consumer-perf-test.sh", node)
         for key, value in self.args.items():
             cmd += " --%s %s" % (key, value)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index 2007d65..917ac85 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -13,13 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os
+
 from kafkatest.services.performance import PerformanceService
 from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.version import TRUNK, V_0_9_0_0
 
-from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
-
-import os
 
 
 class EndToEndLatencyService(PerformanceService):
@@ -77,16 +76,16 @@ class EndToEndLatencyService(PerformanceService):
             'zk_connect': self.kafka.zk.connect_setting(),
             'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
             'config_file': EndToEndLatencyService.CONFIG_FILE,
-            'kafka_dir': kafka_dir(node)
+            'kafka_run_class': self.path.script("kafka-run-class.sh", node)
         })
 
         cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG
         if node.version >= V_0_9_0_0:
-            cmd += "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % args
+            cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s kafka.tools.EndToEndLatency " % args
             cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args
         else:
             # Set fetch max wait to 0 to match behavior in later versions
-            cmd += "KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_dir)s/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency " % args
+            cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s kafka.tools.TestEndToEndLatency " % args
             cmd += "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d 0 %(acks)d" % args
 
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s" % {'stdout': EndToEndLatencyService.STDOUT_CAPTURE,
@@ -104,7 +103,7 @@ class EndToEndLatencyService(PerformanceService):
         if node.version >= V_0_9_0_0:
             client_config += "compression_type=%(compression_type)s" % self.args
         node.account.create_file(EndToEndLatencyService.CONFIG_FILE, client_config)
-        
+
         self.security_config.setup_node(node)
 
         cmd = self.start_cmd(node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/performance/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py
index dcc1a32..d6d4f14 100644
--- a/tests/kafkatest/services/performance/performance.py
+++ b/tests/kafkatest/services/performance/performance.py
@@ -14,9 +14,10 @@
 # limitations under the License.
 
 from ducktape.services.background_thread import BackgroundThreadService
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
 
-class PerformanceService(BackgroundThreadService):
+class PerformanceService(KafkaPathResolverMixin, BackgroundThreadService):
 
     def __init__(self, context, num_nodes, stop_timeout_sec=30):
         super(PerformanceService, self).__init__(context, num_nodes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index efd6c09..7131df1 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -13,16 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os
+import subprocess
+
 from ducktape.utils.util import wait_until
 
+from kafkatest.directory_layout.kafka_path import  TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.performance import PerformanceService
 from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
-from kafkatest.services.kafka.version import TRUNK, V_0_9_0_0
-
-import os
-import subprocess
+from kafkatest.version import TRUNK, V_0_9_0_0
 
 
 class ProducerPerformanceService(JmxMixin, PerformanceService):
@@ -84,7 +84,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
             'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
             'jmx_port': self.jmx_port,
             'client_id': self.client_id,
-            'kafka_directory': kafka_dir(node)
+            'kafka_run_class': self.path.script("kafka-run-class.sh", node)
             })
 
         cmd = ""
@@ -92,12 +92,15 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
         if node.version < TRUNK:
             # In order to ensure more consistent configuration between versions, always use the ProducerPerformance
             # tool from trunk
-            cmd += "for file in /opt/%s/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
-            cmd += "for file in /opt/%s/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
+            tools_jar = self.path.jar(TOOLS_JAR_NAME, TRUNK)
+            tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, TRUNK)
+
+            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
+            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
             cmd += "export CLASSPATH; "
 
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % ProducerPerformanceService.LOG4J_CONFIG
-        cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
+        cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \
               "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
 
         self.security_config.setup_node(node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/replica_verification_tool.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py
index 7f77049..2033242 100644
--- a/tests/kafkatest/services/replica_verification_tool.py
+++ b/tests/kafkatest/services/replica_verification_tool.py
@@ -15,13 +15,13 @@
 
 from ducktape.services.background_thread import BackgroundThreadService
 
-from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.security.security_config import SecurityConfig
 
 import re
 
 
-class ReplicaVerificationTool(BackgroundThreadService):
+class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
 
     logs = {
         "producer_log": {
@@ -68,8 +68,8 @@ class ReplicaVerificationTool(BackgroundThreadService):
         return lag
 
     def start_cmd(self, node):
-        cmd = "/opt/%s/bin/" % kafka_dir(node)
-        cmd += "kafka-run-class.sh kafka.tools.ReplicaVerificationTool"
+        cmd = self.path.script("kafka-run-class.sh", node)
+        cmd += " kafka.tools.ReplicaVerificationTool"
         cmd += " --broker-list %s --topic-white-list %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms)
 
         cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &"

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/security/kafka_acls.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/kafka_acls.py b/tests/kafkatest/services/security/kafka_acls.py
index eb85354..5fcb5e7 100644
--- a/tests/kafkatest/services/security/kafka_acls.py
+++ b/tests/kafkatest/services/security/kafka_acls.py
@@ -13,12 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
-class ACLs():
 
-    def __init__(self):
-        pass
+class ACLs(KafkaPathResolverMixin):
+    def __init__(self, context):
+        self.context = context
 
     def set_acls(self, protocol, kafka, zk, topic, group):
         node = kafka.nodes[0]
@@ -35,7 +35,7 @@ class ACLs():
         self.acls_command(node, ACLs.consume_acl(setting, topic, group, client_principal))
 
     def acls_command(self, node, properties):
-        cmd = "/opt/%s/bin/kafka-acls.sh %s" % (kafka_dir(node), properties)
+        cmd = "%s %s" % (self.path.script("kafka-acls.sh", node), properties)
         node.account.ssh(cmd)
 
     @staticmethod

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/security/minikdc.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py
index d83aede..0e7bb1b 100644
--- a/tests/kafkatest/services/security/minikdc.py
+++ b/tests/kafkatest/services/security/minikdc.py
@@ -13,18 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.services.service import Service
-from kafkatest.services.kafka.directory import kafka_dir
-
 import os
-from tempfile import mkstemp
-from shutil import move
-from os import remove, close
-from io import open
 import uuid
+from io import open
+from os import remove, close
+from shutil import move
+from tempfile import mkstemp
+
+from ducktape.services.service import Service
 
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, CORE_LIBS_JAR_NAME, CORE_DEPENDANT_TEST_LIBS_JAR_NAME
+from kafkatest.version import TRUNK
 
-class MiniKdc(Service):
+
+class MiniKdc(KafkaPathResolverMixin, Service):
 
     logs = {
         "minikdc_log": {
@@ -40,7 +42,7 @@ class MiniKdc(Service):
     LOCAL_KEYTAB_FILE = "/tmp/" + str(uuid.uuid4().get_hex()) + "_keytab"
     LOCAL_KRB5CONF_FILE = "/tmp/" + str(uuid.uuid4().get_hex()) + "_krb5.conf"
 
-    def __init__(self, context, kafka_nodes, extra_principals = ""):
+    def __init__(self, context, kafka_nodes, extra_principals=""):
         super(MiniKdc, self).__init__(context, 1)
         self.kafka_nodes = kafka_nodes
         self.extra_principals = extra_principals
@@ -66,9 +68,13 @@ class MiniKdc(Service):
         principals = 'client ' + kafka_principals + self.extra_principals
         self.logger.info("Starting MiniKdc with principals " + principals)
 
-        jar_paths = self.core_jar_paths(node, "dependant-testlibs") + self.core_jar_paths(node, "libs")
-        classpath = ":".join(jar_paths)
-        cmd = "INCLUDE_TEST_JARS=true CLASSPATH=%s /opt/%s/bin/kafka-run-class.sh kafka.security.minikdc.MiniKdc %s %s %s %s 1>> %s 2>> %s &" % (classpath, kafka_dir(node), MiniKdc.WORK_DIR, MiniKdc.PROPS_FILE, MiniKdc.KEYTAB_FILE, principals, MiniKdc.LOG_FILE, MiniKdc.LOG_FILE)
+        core_libs_jar = self.path.jar(CORE_LIBS_JAR_NAME, TRUNK)
+        core_dependant_test_libs_jar = self.path.jar(CORE_DEPENDANT_TEST_LIBS_JAR_NAME, TRUNK)
+
+        cmd = "for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_libs_jar
+        cmd += " for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_dependant_test_libs_jar
+        cmd += " export CLASSPATH;"
+        cmd += " %s kafka.security.minikdc.MiniKdc %s %s %s %s 1>> %s 2>> %s &" % (self.path.script("kafka-run-class.sh", node), MiniKdc.WORK_DIR, MiniKdc.PROPS_FILE, MiniKdc.KEYTAB_FILE, principals, MiniKdc.LOG_FILE, MiniKdc.LOG_FILE)
         self.logger.debug("Attempting to start MiniKdc on %s with command: %s" % (str(node.account), cmd))
         with node.account.monitor_log(MiniKdc.LOG_FILE) as monitor:
             node.account.ssh(cmd)
@@ -77,14 +83,9 @@ class MiniKdc(Service):
         node.account.scp_from(MiniKdc.KEYTAB_FILE, MiniKdc.LOCAL_KEYTAB_FILE)
         node.account.scp_from(MiniKdc.KRB5CONF_FILE, MiniKdc.LOCAL_KRB5CONF_FILE)
 
-        #KDC is set to bind openly (via 0.0.0.0). Change krb5.conf to hold the specific KDC address
+        # KDC is set to bind openly (via 0.0.0.0). Change krb5.conf to hold the specific KDC address
         self.replace_in_file(MiniKdc.LOCAL_KRB5CONF_FILE, '0.0.0.0', node.account.hostname)
 
-    def core_jar_paths(self, node, lib_dir_name):
-        lib_dir = "/opt/%s/core/build/%s" % (kafka_dir(node), lib_dir_name)
-        jars = node.account.ssh_capture("ls " + lib_dir)
-        return [os.path.join(lib_dir, jar.strip()) for jar in jars]
-
     def stop_node(self, node):
         self.logger.info("Stopping %s on %s" % (type(self).__name__, node.account.hostname))
         node.account.kill_process("apacheds", allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/simple_consumer_shell.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/simple_consumer_shell.py b/tests/kafkatest/services/simple_consumer_shell.py
index c44540d..7204748 100644
--- a/tests/kafkatest/services/simple_consumer_shell.py
+++ b/tests/kafkatest/services/simple_consumer_shell.py
@@ -15,10 +15,10 @@
 
 from ducktape.services.background_thread import BackgroundThreadService
 
-from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
 
-class SimpleConsumerShell(BackgroundThreadService):
+class SimpleConsumerShell(KafkaPathResolverMixin, BackgroundThreadService):
 
     logs = {
         "simple_consumer_shell_log": {
@@ -45,8 +45,8 @@ class SimpleConsumerShell(BackgroundThreadService):
         self.logger.debug(self.output)
 
     def start_cmd(self, node):
-        cmd = "/opt/%s/bin/" % kafka_dir(node)
-        cmd += "kafka-run-class.sh kafka.tools.SimpleConsumerShell"
+        cmd = self.path.script("kafka-run-class.sh", node)
+        cmd += " kafka.tools.SimpleConsumerShell"
         cmd += " --topic %s --broker-list %s --partition %s --no-wait-at-logend" % (self.topic, self.kafka.bootstrap_servers(), self.partition)
 
         cmd += " 2>> /mnt/get_simple_consumer_shell.log | tee -a /mnt/get_simple_consumer_shell.log &"

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 53d967e..875daee 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -13,15 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os.path
+import signal
+
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 
-from kafkatest.services.kafka.directory import kafka_dir
-import signal
-import os.path
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 
 
-class StreamsSmokeTestBaseService(Service):
+class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
     PERSISTENT_ROOT = "/mnt/streams"
@@ -105,10 +106,10 @@ class StreamsSmokeTestBaseService(Service):
         args['stderr'] = self.STDERR_FILE
         args['pidfile'] = self.PID_FILE
         args['log4j'] = self.LOG4J_CONFIG_FILE
-        args['kafka_dir'] = kafka_dir(node)
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
 
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
-              "INCLUDE_TEST_JARS=true /opt/%(kafka_dir)s/bin/kafka-run-class.sh org.apache.kafka.streams.smoketest.StreamsSmokeTest " \
+              "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.smoketest.StreamsSmokeTest " \
               " %(command)s %(kafka)s %(zk)s %(state_dir)s " \
               " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 55304dc..9c6abdd 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -13,17 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.services.background_thread import BackgroundThreadService
-
-from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.kafka.version import TRUNK
-from kafkatest.services.kafka import TopicPartition
-
 import json
 import os
 import signal
 import subprocess
 
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.kafka import TopicPartition
+from kafkatest.version import TRUNK
+
 
 class ConsumerState:
     Dead = 1
@@ -112,7 +112,7 @@ class ConsumerEventHandler(object):
             return None
 
 
-class VerifiableConsumer(BackgroundThreadService):
+class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
     PERSISTENT_ROOT = "/mnt/verifiable_consumer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stdout")
     STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stderr")
@@ -226,9 +226,9 @@ class VerifiableConsumer(BackgroundThreadService):
         cmd += "export LOG_DIR=%s;" % VerifiableConsumer.LOG_DIR
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
-        cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \
+        cmd += self.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools.VerifiableConsumer" \
               " --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \
-              (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol),
+                                            (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol),
                self.session_timeout_sec*1000, self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "")
                
         if self.max_messages > 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index a6a1bd9..dbdf71f 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -13,19 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.services.background_thread import BackgroundThreadService
-
-from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
-from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
-from kafkatest.utils import is_int, is_int_with_prefix
-
 import json
 import os
 import signal
 import subprocess
 import time
 
-class VerifiableProducer(BackgroundThreadService):
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
+from kafkatest.utils import is_int, is_int_with_prefix
+from kafkatest.version import TRUNK, LATEST_0_8_2
+
+
+class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
     PERSISTENT_ROOT = "/mnt/verifiable_producer"
     STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout")
     LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
@@ -110,7 +111,6 @@ class VerifiableProducer(BackgroundThreadService):
         cmd = self.start_cmd(node, idx)
         self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))
 
-
         self.produced_count[idx] = 0
         last_produced_time = time.time()
         prev_msg = None
@@ -147,20 +147,23 @@ class VerifiableProducer(BackgroundThreadService):
                         self.clean_shutdown_nodes.add(node)
 
     def start_cmd(self, node, idx):
-
         cmd = ""
         if node.version <= LATEST_0_8_2:
             # 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add
             # the tools jar from trunk to the classpath
-            cmd += "for file in /opt/%s/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
-            cmd += "for file in /opt/%s/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
+            tools_jar = self.path.jar(TOOLS_JAR_NAME, TRUNK)
+            tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, TRUNK)
+
+            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
+            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
             cmd += "export CLASSPATH; "
 
         cmd += "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
-        cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \
-              " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol))
+        cmd += " " + self.path.script("kafka-run-class.sh", node)
+        cmd += " org.apache.kafka.tools.VerifiableProducer"
+        cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol))
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
         if self.throughput > 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index fb73587..07e2c0c 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -14,18 +14,18 @@
 # limitations under the License.
 
 
+import re
+import subprocess
+import time
+
 from ducktape.services.service import Service
 
-from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
-
-import subprocess
-import time
-import re
+from kafkatest.version import TRUNK
 
 
-class ZookeeperService(Service):
+class ZookeeperService(KafkaPathResolverMixin, Service):
 
     logs = {
         "zk_log": {
@@ -73,7 +73,7 @@ class ZookeeperService(Service):
         node.account.create_file("/mnt/zookeeper.properties", config_file)
 
         start_cmd = "export KAFKA_OPTS=\"%s\";" % self.kafka_opts 
-        start_cmd += "/opt/%s/bin/zookeeper-server-start.sh " % kafka_dir(node)
+        start_cmd += "%s " % self.path.script("zookeeper-server-start.sh", node)
         start_cmd += "/mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &" % self.logs["zk_log"]
         node.account.ssh(start_cmd)
 
@@ -111,16 +111,17 @@ class ZookeeperService(Service):
     # the use of ZooKeeper ACLs.
     #
     def zookeeper_migration(self, node, zk_acl):
-        la_migra_cmd = "/opt/%s/bin/zookeeper-security-migration.sh --zookeeper.acl=%s --zookeeper.connect=%s" % (kafka_dir(node), zk_acl, self.connect_setting())
+        la_migra_cmd = "%s --zookeeper.acl=%s --zookeeper.connect=%s" % \
+                       (self.path.script("zookeeper-security-migration.sh", node), zk_acl, self.connect_setting())
         node.account.ssh(la_migra_cmd)
 
     def query(self, path):
         """
         Queries zookeeper for data associated with 'path' and returns all fields in the schema
         """
-        kafka_dir = KAFKA_TRUNK
-        cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s get %s" % \
-              (kafka_dir, self.connect_setting(), path)
+        kafka_run_class = self.path.script("kafka-run-class.sh", TRUNK)
+        cmd = "%s kafka.tools.ZooKeeperMainWrapper -server %s get %s" % \
+              (kafka_run_class, self.connect_setting(), path)
         self.logger.debug(cmd)
 
         node = self.nodes[0]

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
index 357fd17..a1ebf22 100644
--- a/tests/kafkatest/tests/client/message_format_change_test.py
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -12,18 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.tests.test import Test
 from ducktape.mark import parametrize
 from ducktape.utils.util import wait_until
-from kafkatest.services.zookeeper import ZookeeperService
+
+from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
+from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.kafka import config_property
-import time
+from kafkatest.utils import is_int
+from kafkatest.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
 
 
 class MessageFormatChangeTest(ProduceConsumeValidateTest):

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index 2c261df..85fc9ae 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -12,17 +12,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.tests.test import Test
 from ducktape.mark import parametrize
 from ducktape.utils.util import wait_until
-from kafkatest.services.zookeeper import ZookeeperService
+
+from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
+from kafkatest.services.kafka import config_property
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
+from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.kafka import config_property
+from kafkatest.utils import is_int
+from kafkatest.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
+
 
 # Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
 class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index fdbedca..3977490 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -16,7 +16,6 @@
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.utils import is_int
@@ -34,7 +33,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         super(TestSecurityRollingUpgrade, self).__init__(test_context=test_context)
 
     def setUp(self):
-        self.acls = ACLs()
+        self.acls = ACLs(self.test_context)
         self.topic = "test_topic"
         self.group = "group"
         self.producer_throughput = 100

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/tests/core/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
index 9926f11..778d6a5 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -14,14 +14,15 @@
 # limitations under the License.
 
 from ducktape.mark import parametrize
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
-from kafkatest.services.verifiable_producer import VerifiableProducer
+
 from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
 from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
+from kafkatest.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
 
 
 class TestUpgrade(ProduceConsumeValidateTest):

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
index 7f80deb..7e722f7 100644
--- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
@@ -19,11 +19,9 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.services.security.kafka_acls import ACLs
 from kafkatest.utils import is_int
-import time
 
 class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
     """Tests a rolling upgrade for zookeeper.
@@ -38,7 +36,7 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
         self.producer_throughput = 100
         self.num_producers = 1
         self.num_consumers = 1
-        self.acls = ACLs()
+        self.acls = ACLs(self.test_context)
 
         self.zk = ZookeeperService(self.test_context, num_nodes=3)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/kafkatest/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
new file mode 100644
index 0000000..6b378e7
--- /dev/null
+++ b/tests/kafkatest/version.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from kafkatest.utils import kafkatest_version
+
+from distutils.version import LooseVersion
+
+
+class KafkaVersion(LooseVersion):
+    """Container for kafka versions which makes versions simple to compare.
+
+    distutils.version.LooseVersion (and StrictVersion) has robust comparison and ordering logic.
+
+    Example:
+
+        v10 = KafkaVersion("0.10.0")
+        v9 = KafkaVersion("0.9.0.1")
+        assert v10 > v9  # assertion passes!
+    """
+    def __init__(self, version_string):
+        self.is_trunk = (version_string.lower() == "trunk")
+        if self.is_trunk:
+            # Since "trunk" may actually be a branch that is not trunk,
+            # use kafkatest_version() for comparison purposes,
+            # and track whether we're in "trunk" with a flag
+            version_string = kafkatest_version()
+
+            # Drop dev suffix if present
+            dev_suffix_index = version_string.find(".dev")
+            if dev_suffix_index >= 0:
+                version_string = version_string[:dev_suffix_index]
+
+        # Don't use the form super.(...).__init__(...) because
+        # LooseVersion is an "old style" python class
+        LooseVersion.__init__(self, version_string)
+
+    def __str__(self):
+        if self.is_trunk:
+            return "trunk"
+        else:
+            return LooseVersion.__str__(self)
+
+
+def get_version(node=None):
+    """Return the version attached to the given node.
+    Default to trunk if node or node.version is undefined (aka None)
+    """
+    if node is not None and hasattr(node, "version") and node.version is not None:
+        return node.version
+    else:
+        return TRUNK
+
+TRUNK = KafkaVersion("trunk")
+
+# 0.8.2.X versions
+V_0_8_2_1 = KafkaVersion("0.8.2.1")
+V_0_8_2_2 = KafkaVersion("0.8.2.2")
+LATEST_0_8_2 = V_0_8_2_2
+
+# 0.9.0.X versions
+V_0_9_0_0 = KafkaVersion("0.9.0.0")
+V_0_9_0_1 = KafkaVersion("0.9.0.1")
+LATEST_0_9 = V_0_9_0_1
+
+# 0.10.0.X versions
+V_0_10_0_0 = KafkaVersion("0.10.0.0")
+LATEST_0_10 = V_0_10_0_0

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/setup.cfg
----------------------------------------------------------------------
diff --git a/tests/setup.cfg b/tests/setup.cfg
new file mode 100644
index 0000000..c70f1e4
--- /dev/null
+++ b/tests/setup.cfg
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pytest configuration (can also be defined in in tox.ini or pytest.ini file)
+#
+# This file defines naming convention and root search directory for autodiscovery of
+# pytest unit tests for the system test service classes.
+#
+# To ease possible confusion, 'check' instead of 'test' as a prefix for unit tests, since
+# many system test files, classes, and methods have 'test' somewhere in the name
+[pytest]
+testpaths=unit
+python_files=check_*.py
+python_classes=Check
+python_functions=check_*
+
+# don't search inside any resources directory for unit tests
+norecursedirs = resources

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/setup.py
----------------------------------------------------------------------
diff --git a/tests/setup.py b/tests/setup.py
index 910c0a2..9961508 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -15,12 +15,33 @@
 # see kafka.server.KafkaConfig for additional details and defaults
 
 import re
+import sys
 from setuptools import find_packages, setup
+from setuptools.command.test import test as TestCommand
 
 version = ''
 with open('kafkatest/__init__.py', 'r') as fd:
-    version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
-                        fd.read(), re.MULTILINE).group(1)
+    version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE).group(1)
+
+
+class PyTest(TestCommand):
+    user_options = [('pytest-args=', 'a', "Arguments to pass to py.test")]
+
+    def initialize_options(self):
+        TestCommand.initialize_options(self)
+        self.pytest_args = []
+
+    def finalize_options(self):
+        TestCommand.finalize_options(self)
+        self.test_args = []
+        self.test_suite = True
+
+    def run_tests(self):
+        # import here, cause outside the eggs aren't loaded
+        import pytest
+        print self.pytest_args
+        errno = pytest.main(self.pytest_args)
+        sys.exit(errno)
 
 setup(name="kafkatest",
       version=version,
@@ -30,5 +51,7 @@ setup(name="kafkatest",
       license="apache2.0",
       packages=find_packages(),
       include_package_data=True,
-      install_requires=["ducktape==0.5.0", "requests>=2.5.0"]
+      install_requires=["ducktape==0.5.0", "requests>=2.5.0"],
+      tests_require=["pytest", "mock"],
+      cmdclass={'test': PyTest},
       )

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/unit/__init__.py
----------------------------------------------------------------------
diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/unit/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/aabf8251/tests/unit/directory_layout/__init__.py
----------------------------------------------------------------------
diff --git a/tests/unit/directory_layout/__init__.py b/tests/unit/directory_layout/__init__.py
new file mode 100644
index 0000000..ec20143
--- /dev/null
+++ b/tests/unit/directory_layout/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.


Mime
View raw message