kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7225; Corrected system tests by generating external properties file (#5489)
Date Thu, 23 Aug 2018 21:23:02 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 19b8ac5  KAFKA-7225; Corrected system tests by generating external properties file
(#5489)
19b8ac5 is described below

commit 19b8ac55c389e4b2022476431a28c8431caed52a
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Thu Aug 23 14:22:09 2018 -0700

    KAFKA-7225; Corrected system tests by generating external properties file (#5489)
    
    Fix system tests from earlier #5445 by moving to the `ConnectSystemBase` class the creation
& cleanup of a file that can be used as externalized secrets in connector configs.
    
    Reviewers: Arjun Satish <arjun@confluent.io>, Robert Yokota <rayokota@gmail.com>,
Konstantine Karantasis <konstantine@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 tests/kafkatest/services/connect.py                   | 19 ++++++++++++++++++-
 tests/kafkatest/tests/connect/connect_test.py         | 13 +++++++------
 .../connect/templates/connect-standalone.properties   |  3 +++
 3 files changed, 28 insertions(+), 7 deletions(-)

diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 19beddd..d8c8d5a 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -40,6 +40,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     STDERR_FILE = os.path.join(PERSISTENT_ROOT, "connect.stderr")
     LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect-log4j.properties")
     PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid")
+    EXTERNAL_CONFIGS_FILE = os.path.join(PERSISTENT_ROOT, "connect-external-configs.properties")
     CONNECT_REST_PORT = 8083
 
     # Currently the Connect worker supports waiting on three modes:
@@ -69,6 +70,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         self.files = files
         self.startup_mode = self.STARTUP_MODE_LISTEN
         self.environment = {}
+        self.external_config_template_func = None
 
     def pids(self, node):
         """Return process ids for Kafka Connect processes."""
@@ -87,6 +89,17 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         self.config_template_func = config_template_func
         self.connector_config_templates = connector_config_templates
 
+    def set_external_configs(self, external_config_template_func):
+        """
+        Set the properties that will be written in the external file properties
+        as used by the org.apache.kafka.common.config.provider.FileConfigProvider.
+        When this is used, the worker configuration must also enable the FileConfigProvider.
+        This is not provided in the constructor in case the worker
+        config generally needs access to ZK/Kafka services to
+        create the configuration.
+        """
+        self.external_config_template_func = external_config_template_func
+
     def listening(self, node):
         try:
             cmd = "nc -z %s %s" % (node.account.hostname, self.CONNECT_REST_PORT)
@@ -145,7 +158,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def clean_node(self, node):
         node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
         self.security_config.clean_node(node)
-        all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE,
self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files)
+        all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE,
self.STDOUT_FILE, self.STDERR_FILE, self.EXTERNAL_CONFIGS_FILE] + self.config_filenames()
+ self.files)
         node.account.ssh("rm -rf " + all_files, allow_fail=False)
 
     def config_filenames(self):
@@ -263,6 +276,8 @@ class ConnectStandaloneService(ConnectServiceBase):
         node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
 
         self.security_config.setup_node(node)
+        if self.external_config_template_func:
+            node.account.create_file(self.EXTERNAL_CONFIGS_FILE, self.external_config_template_func(node))
         node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties',
log_file=self.LOG_FILE))
         remote_connector_configs = []
@@ -308,6 +323,8 @@ class ConnectDistributedService(ConnectServiceBase):
         node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
 
         self.security_config.setup_node(node)
+        if self.external_config_template_func:
+            node.account.create_file(self.EXTERNAL_CONFIGS_FILE, self.external_config_template_func(node))
         node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties',
log_file=self.LOG_FILE))
         if self.connector_config_templates:
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index c961681..e2618e9 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -47,7 +47,7 @@ class ConnectStandaloneFileTest(Test):
 
     OFFSETS_FILE = "/mnt/connect.offsets"
 
-    TOPIC = "${file:/mnt/connect/connect-file-external.properties:topic.external}"
+    TOPIC = "${file:" + EXTERNAL_CONFIGS_FILE + ":topic.external}"
     TOPIC_TEST = "test"
 
     FIRST_INPUT_LIST = ["foo", "bar", "baz"]
@@ -100,14 +100,12 @@ class ConnectStandaloneFileTest(Test):
         self.zk.start()
         self.kafka.start()
 
-        source_external_props = os.path.join(self.source.PERSISTENT_ROOT, "connect-file-external.properties")
-        self.source.node.account.create_file(source_external_props, self.render('connect-file-external.properties'))
         self.source.set_configs(lambda node: self.render("connect-standalone.properties",
node=node), [self.render("connect-file-source.properties")])
-
-        sink_external_props = os.path.join(self.sink.PERSISTENT_ROOT, "connect-file-external.properties")
-        self.sink.node.account.create_file(sink_external_props, self.render('connect-file-external.properties'))
         self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node),
[self.render("connect-file-sink.properties")])
 
+        self.source.set_external_configs(lambda node: self.render("connect-file-external.properties",
node=node))
+        self.sink.set_external_configs(lambda node: self.render("connect-file-external.properties",
node=node))
+
         self.source.start()
         self.sink.start()
 
@@ -182,6 +180,9 @@ class ConnectStandaloneFileTest(Test):
         self.override_value_converter_schemas_enable = False
         self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node),
[self.render("connect-file-sink.properties")])
 
+        self.source.set_external_configs(lambda node: self.render("connect-file-external.properties",
node=node))
+        self.sink.set_external_configs(lambda node: self.render("connect-file-external.properties",
node=node))
+
         self.source.start()
         self.sink.start()
 
diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index cbfe491..a471dd5 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -32,5 +32,8 @@ offset.storage.file.filename={{ OFFSETS_FILE }}
 # Reduce the admin client request timeouts so that we don't wait the default 120 sec before
failing to connect the admin client
 request.timeout.ms=30000
 
+# Allow connector configs to use externalized config values of the form:
+#   ${file:/mnt/connect/connect-external-configs.properties:topic.external}
+#
 config.providers=file
 config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider


Mime
View raw message