kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-4544: Add system tests for delegation token based authentication
Date Mon, 03 Dec 2018 05:58:52 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e7ce0e7  KAFKA-4544: Add system tests for delegation token based authentication
e7ce0e7 is described below

commit e7ce0e7e0a82bb56b1106479c0c8a7984fd92604
Author: Attila Sasvari <asasvari@apache.org>
AuthorDate: Mon Dec 3 11:28:36 2018 +0530

    KAFKA-4544: Add system tests for delegation token based authentication
    
    This change adds some basic system tests for delegation token based authentication:
    - basic delegation token creation
    - producing with a delegation token
    - consuming with a delegation token
    - expiring a delegation token
    - producing with an expired delegation token
    
    New files:
    - delegation_tokens.py: a wrapper around kafka-delegation-tokens.sh  - executed in container
where a secure Broker is running (taking advantage of automatic cleanup)
    - delegation_tokens_test.py: basic test to validate the lifecycle of a delegation token
    
    Changes were made in the following file to extend their functionality:
    - config_property was updated to be able to configure Kafka brokers with delegation token
related settings
    - jaas.conf template because a broker needs to support multiple login modules when delegation
tokens are used
    - consule-consumer and verifiable_producer to override KAFKA_OPTS (to specify custom jaas.conf)
and the client properties (to authenticate with delegation token).
    
    Author: Attila Sasvari <asasvari@apache.org>
    
    Reviewers: Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Andras Katona <41361962+akatona84@users.noreply.github.com>,
Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Closes #5660 from asasvari/KAFKA-4544
---
 tests/kafkatest/services/console_consumer.py       |  31 ++++-
 tests/kafkatest/services/delegation_tokens.py      | 102 ++++++++++++++++
 tests/kafkatest/services/kafka/config_property.py  |   6 +
 .../services/security/templates/jaas.conf          |   6 +-
 tests/kafkatest/services/verifiable_producer.py    |  41 ++++---
 .../kafkatest/tests/core/delegation_token_test.py  | 130 +++++++++++++++++++++
 6 files changed, 294 insertions(+), 22 deletions(-)

diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 65c9fa5..dfbec9f 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -60,8 +60,9 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
     def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group",
new_consumer=True,
                  message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=DEV_BRANCH,
                  client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
-                 enable_systest_events=False, stop_timeout_sec=30, print_timestamp=False,
-                 isolation_level="read_uncommitted", jaas_override_variables=None):
+                 enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False,
+                 isolation_level="read_uncommitted", jaas_override_variables=None,
+                 kafka_opts_override="", client_prop_file_override=""):
         """
         Args:
             context:                    standard context
@@ -83,7 +84,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
             print_timestamp             if True, print each message's timestamp as well
             isolation_level             How to handle transactional messages.
             jaas_override_variables     A dict of variables to be used in the jaas.conf template
file
-
+            kafka_opts_override         Override parameters of the KAFKA_OPTS environment
variable
+            client_prop_file_override   Override client.properties file used by the consumer
         """
         JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes
or []),
                           root=ConsoleConsumer.PERSISTENT_ROOT)
@@ -116,6 +118,9 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
 
         self.print_timestamp = print_timestamp
         self.jaas_override_variables = jaas_override_variables or {}
+        self.kafka_opts_override = kafka_opts_override
+        self.client_prop_file_override = client_prop_file_override
+
 
     def prop_file(self, node):
         """Return a string which can be used to create a configuration file appropriate for
the given node."""
@@ -134,6 +139,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         prop_file += str(self.security_config)
         return prop_file
 
+
     def start_cmd(self, node):
         """Return the start command appropriate for the given node."""
         args = self.args.copy()
@@ -147,14 +153,19 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         args['jmx_port'] = self.jmx_port
         args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node)
         args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
-        args['kafka_opts'] = self.security_config.kafka_opts
+
+        if self.kafka_opts_override:
+            args['kafka_opts'] = "\"%s\"" % self.kafka_opts_override
+        else:
+            args['kafka_opts'] = self.security_config.kafka_opts
 
         cmd = "export JMX_PORT=%(jmx_port)s; " \
               "export LOG_DIR=%(log_dir)s; " \
               "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; "
\
               "export KAFKA_OPTS=%(kafka_opts)s; " \
               "%(console_consumer)s " \
-              "--topic %(topic)s --consumer.config %(config_file)s" % args
+              "--topic %(topic)s " \
+              "--consumer.config %(config_file)s " % args
 
         if self.new_consumer:
             assert node.version >= V_0_9_0_0, \
@@ -209,7 +220,15 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         # Create and upload config file
         self.logger.info("console_consumer.properties:")
 
-        prop_file = self.prop_file(node)
+        self.security_config = self.kafka.security_config.client_config(node=node,
+                                                                        jaas_override_variables=self.jaas_override_variables)
+        self.security_config.setup_node(node)
+
+        if self.client_prop_file_override:
+            prop_file = self.client_prop_file_override
+        else:
+            prop_file = self.prop_file(node)
+
         self.logger.info(prop_file)
         node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
 
diff --git a/tests/kafkatest/services/delegation_tokens.py b/tests/kafkatest/services/delegation_tokens.py
new file mode 100644
index 0000000..34da16b
--- /dev/null
+++ b/tests/kafkatest/services/delegation_tokens.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os.path
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+
+"""
+Delegation tokens is a tool to manage the lifecycle of delegation tokens.
+All commands are executed on a secured Kafka node reusing its generated jaas.conf and krb5.conf.
+"""
+
+class DelegationTokens(KafkaPathResolverMixin):
+    def __init__(self, kafka, context):
+        self.client_properties_content = """
+security.protocol=SASL_PLAINTEXT
+sasl.kerberos.service.name=kafka
+"""
+        self.context = context
+        self.command_path = self.path.script("kafka-delegation-tokens.sh")
+        self.kafka_opts = "KAFKA_OPTS=\"-Djava.security.auth.login.config=/mnt/security/jaas.conf
" \
+                          "-Djava.security.krb5.conf=/mnt/security/krb5.conf\" "
+        self.kafka = kafka
+        self.bootstrap_server = " --bootstrap-server " + self.kafka.bootstrap_servers('SASL_PLAINTEXT')
+        self.base_cmd = self.kafka_opts + self.command_path + self.bootstrap_server
+        self.client_prop_path = os.path.join(self.kafka.PERSISTENT_ROOT, "client.properties")
+        self.jaas_deleg_conf_path = os.path.join(self.kafka.PERSISTENT_ROOT, "jaas_deleg.conf")
+        self.token_hmac_path = os.path.join(self.kafka.PERSISTENT_ROOT, "deleg_token_hmac.out")
+        self.delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT, "delegation_token.out")
+        self.expire_delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT, "expire_delegation_token.out")
+        self.renew_delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT, "renew_delegation_token.out")
+
+        self.node = self.kafka.nodes[0]
+
+    def generate_delegation_token(self, maxlifetimeperiod=-1):
+        self.node.account.create_file(self.client_prop_path, self.client_properties_content)
+
+        cmd = self.base_cmd + "  --create" \
+                              "  --max-life-time-period %s" \
+                              "  --command-config %s > %s" % (maxlifetimeperiod, self.client_prop_path,
self.delegation_token_out)
+        self.node.account.ssh(cmd, allow_fail=False)
+
+    def expire_delegation_token(self, hmac):
+        cmd = self.base_cmd + "  --expire" \
+                              "  --expiry-time-period -1" \
+                              "  --hmac %s" \
+                              "  --command-config %s > %s" % (hmac, self.client_prop_path,
self.expire_delegation_token_out)
+        self.node.account.ssh(cmd, allow_fail=False)
+
+    def renew_delegation_token(self, hmac, renew_time_period=-1):
+        cmd = self.base_cmd + "  --renew" \
+                              "  --renew-time-period %s" \
+                              "  --hmac %s" \
+                              "  --command-config %s > %s" \
+              % (renew_time_period, hmac, self.client_prop_path, self.renew_delegation_token_out)
+        return self.node.account.ssh_capture(cmd, allow_fail=False)
+
+    def create_jaas_conf_with_delegation_token(self):
+        dt = self.parse_delegation_token_out()
+        jaas_deleg_content = """
+KafkaClient {
+  org.apache.kafka.common.security.scram.ScramLoginModule required
+  username="%s"
+  password="%s"
+  tokenauth=true;
+};
+""" % (dt["tokenid"], dt["hmac"])
+        self.node.account.create_file(self.jaas_deleg_conf_path, jaas_deleg_content)
+
+        return jaas_deleg_content
+
+    def token_hmac(self):
+        dt = self.parse_delegation_token_out()
+        return dt["hmac"]
+
+    def parse_delegation_token_out(self):
+        cmd = "tail -1 %s" % self.delegation_token_out
+
+        output_iter = self.node.account.ssh_capture(cmd, allow_fail=False)
+        output = ""
+        for line in output_iter:
+            output += line
+
+        tokenid, hmac, owner, renewers, issuedate, expirydate, maxdate = output.split()
+        return {"tokenid" : tokenid,
+                "hmac" : hmac,
+                "owner" : owner,
+                "renewers" : renewers,
+                "issuedate" : issuedate,
+                "expirydate" :expirydate,
+                "maxdate" : maxdate}
\ No newline at end of file
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index 621b8e5..853bd83 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -50,6 +50,12 @@ REPLICA_HIGHWATERMARK_CHECKPOINT_INTERVAL_MS = "replica.high.watermark.checkpoin
 LOG_ROLL_TIME_MS = "log.roll.ms"
 OFFSETS_TOPIC_NUM_PARTITIONS = "offsets.topic.num.partitions"
 
+DELEGATION_TOKEN_MAX_LIFETIME_MS="delegation.token.max.lifetime.ms"
+DELEGATION_TOKEN_EXPIRY_TIME_MS="delegation.token.expiry.time.ms"
+DELEGATION_TOKEN_MASTER_KEY="delegation.token.master.key"
+SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"
+
+
 """
 From KafkaConfig.scala
 
diff --git a/tests/kafkatest/services/security/templates/jaas.conf b/tests/kafkatest/services/security/templates/jaas.conf
index e251145..3d6c93e 100644
--- a/tests/kafkatest/services/security/templates/jaas.conf
+++ b/tests/kafkatest/services/security/templates/jaas.conf
@@ -15,7 +15,7 @@
 {% if static_jaas_conf %}
 KafkaClient {
 {% endif %}
-{% if client_sasl_mechanism == "GSSAPI" %}
+{% if "GSSAPI" in client_sasl_mechanism %}
 {% if is_ibm_jdk %}
     com.ibm.security.auth.module.Krb5LoginModule required debug=false
     credsType=both
@@ -33,7 +33,7 @@ KafkaClient {
 	org.apache.kafka.common.security.plain.PlainLoginModule required
 	username="client"
 	password="client-secret";
-{% elif client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == "SCRAM-SHA-512"
%}
+{% elif "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in client_sasl_mechanism
 %}
 	org.apache.kafka.common.security.scram.ScramLoginModule required
 	username="{{ SecurityConfig.SCRAM_CLIENT_USER }}"
 	password="{{ SecurityConfig.SCRAM_CLIENT_PASSWORD }}";
@@ -65,7 +65,7 @@ KafkaServer {
 	user_client="client-secret"
 	user_kafka="kafka-secret";
 {% endif %}
-{% if client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == "SCRAM-SHA-512"
%}
+{% if "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in client_sasl_mechanism
%}
 	org.apache.kafka.common.security.scram.ScramLoginModule required
 	username="{{ SecurityConfig.SCRAM_BROKER_USER }}"
 	password="{{ SecurityConfig.SCRAM_BROKER_PASSWORD }}";
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 7fa2654..f339a62 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -58,18 +58,21 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
                  message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
                  stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO",
                  enable_idempotence=False, offline_nodes=[], create_time=-1, repeating_keys=None,
-                 jaas_override_variables=None):
+                 jaas_override_variables=None, kafka_opts_override="", client_prop_file_override=""):
         """
-        :param max_messages is a number of messages to be produced per producer
-        :param message_validator checks for an expected format of messages produced. There
are
-        currently two:
-               * is_int is an integer format; this is default and expected to be used if
-               num_nodes = 1
-               * is_int_with_prefix recommended if num_nodes > 1, because otherwise each
producer
-               will produce exactly same messages, and validation may miss missing messages.
-        :param compression_types: If None, all producers will not use compression; or a list
of
-        compression types, one per producer (could be "none").
-        :param jaas_override_variables: A dict of variables to be used in the jaas.conf template
file
+        Args:
+            :param max_messages                number of messages to be produced per producer
+            :param message_validator           checks for an expected format of messages
produced. There are
+                                               currently two:
+                                               * is_int is an integer format; this is default
and expected to be used if
+                                                 num_nodes = 1
+                                               * is_int_with_prefix recommended if num_nodes
> 1, because otherwise each producer
+                                                 will produce exactly same messages, and
validation may miss missing messages.
+            :param compression_types           If None, all producers will not use compression;
or a list of compression types,
+                                               one per producer (could be "none").
+            :param jaas_override_variables     A dict of variables to be used in the jaas.conf
template file
+            :param kafka_opts_override         Override parameters of the KAFKA_OPTS environment
variable
+            :param client_prop_file_override   Override client.properties file used by the
consumer
         """
         super(VerifiableProducer, self).__init__(context, num_nodes)
         self.log_level = log_level
@@ -97,6 +100,9 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
         self.create_time = create_time
         self.repeating_keys = repeating_keys
         self.jaas_override_variables = jaas_override_variables or {}
+        self.kafka_opts_override = kafka_opts_override
+        self.client_prop_file_override = client_prop_file_override
+
 
     def java_class_name(self):
         return "VerifiableProducer"
@@ -125,7 +131,11 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
         self.security_config.setup_node(node)
 
         # Create and upload config file
-        producer_prop_file = self.prop_file(node)
+        if self.client_prop_file_override:
+            producer_prop_file = self.client_prop_file_override
+        else:
+            producer_prop_file = self.prop_file(node)
+
         if self.acks is not None:
             self.logger.info("VerifiableProducer (index = %d) will use acks = %s", idx, self.acks)
             producer_prop_file += "\nacks=%s\n" % self.acks
@@ -189,7 +199,11 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
 
     def start_cmd(self, node, idx):
         cmd  = "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
-        cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+        if self.kafka_opts_override:
+            cmd += " export KAFKA_OPTS=\"%s\";" % self.kafka_opts_override
+        else:
+            cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
         cmd += self.impl.exec_cmd(node)
         cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol,
True, self.offline_nodes))
@@ -207,6 +221,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin,
Backgrou
             cmd += " --repeating-keys %s " % str(self.repeating_keys)
 
         cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
+
         cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
         return cmd
 
diff --git a/tests/kafkatest/tests/core/delegation_token_test.py b/tests/kafkatest/tests/core/delegation_token_test.py
new file mode 100644
index 0000000..0b2b6eb
--- /dev/null
+++ b/tests/kafkatest/tests/core/delegation_token_test.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import config_property, KafkaService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.delegation_tokens import DelegationTokens
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+from datetime import datetime
+import time
+
+"""
+Basic tests to validate delegation token support
+"""
+class DelegationTokenTest(Test):
+    def __init__(self, test_context):
+        super(DelegationTokenTest, self).__init__(test_context)
+
+        self.test_context = test_context
+        self.topic = "topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka",
+                                  topics={self.topic: {"partitions": 1, "replication-factor":
1}},
+                                  server_prop_overides=[
+                                      [config_property.DELEGATION_TOKEN_MAX_LIFETIME_MS,
"604800000"],
+                                      [config_property.DELEGATION_TOKEN_EXPIRY_TIME_MS, "86400000"],
+                                      [config_property.DELEGATION_TOKEN_MASTER_KEY, "test12345"],
+                                      [config_property.SASL_ENABLED_MECHANISMS, "GSSAPI,SCRAM-SHA-256"]
+                                  ])
+        self.jaas_deleg_conf_path = "/tmp/jaas_deleg.conf"
+        self.jaas_deleg_conf = ""
+        self.client_properties_content = """
+security.protocol=SASL_PLAINTEXT
+sasl.mechanism=SCRAM-SHA-256
+sasl.kerberos.service.name=kafka
+client.id=console-consumer
+"""
+        self.client_kafka_opts=' -Djava.security.auth.login.config=' + self.jaas_deleg_conf_path
+
+        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka,
topic=self.topic, max_messages=1,
+                                       throughput=1, kafka_opts_override=self.client_kafka_opts,
+                                       client_prop_file_override=self.client_properties_content)
+
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka,
topic=self.topic,
+                                        kafka_opts_override=self.client_kafka_opts,
+                                        client_prop_file_override=self.client_properties_content)
+
+        self.kafka.security_protocol = 'SASL_PLAINTEXT'
+        self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256'
+        self.kafka.interbroker_sasl_mechanism = 'GSSAPI'
+
+
+    def setUp(self):
+        self.zk.start()
+
+    def tearDown(self):
+        self.producer.nodes[0].account.remove(self.jaas_deleg_conf_path)
+        self.consumer.nodes[0].account.remove(self.jaas_deleg_conf_path)
+
+    def generate_delegation_token(self):
+        self.logger.debug("Request delegation token")
+        self.delegation_tokens.generate_delegation_token()
+        self.jaas_deleg_conf = self.delegation_tokens.create_jaas_conf_with_delegation_token()
+
+    def expire_delegation_token(self):
+        self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256'
+        token_hmac = self.delegation_tokens.token_hmac()
+        self.delegation_tokens.expire_delegation_token(token_hmac)
+
+
+    def produce_with_delegation_token(self):
+        self.producer.acked_values = []
+        self.producer.nodes[0].account.create_file(self.jaas_deleg_conf_path, self.jaas_deleg_conf)
+        self.logger.debug(self.jaas_deleg_conf)
+        self.producer.start()
+
+    def consume_with_delegation_token(self):
+        self.logger.debug("Consume messages with delegation token")
+
+        self.consumer.nodes[0].account.create_file(self.jaas_deleg_conf_path, self.jaas_deleg_conf)
+        self.logger.debug(self.jaas_deleg_conf)
+        self.consumer.consumer_timeout_ms = 5000
+
+        self.consumer.start()
+        self.consumer.wait()
+
+    def get_datetime_ms(self, input_date):
+        return int(time.mktime(datetime.strptime(input_date,"%Y-%m-%dT%H:%M").timetuple())
* 1000)
+
+    def renew_delegation_token(self):
+        dt = self.delegation_tokens.parse_delegation_token_out()
+        orig_expiry_date_ms = self.get_datetime_ms(dt["expirydate"])
+        new_expirydate_ms = orig_expiry_date_ms + 1000
+
+        self.delegation_tokens.renew_delegation_token(dt["hmac"], new_expirydate_ms)
+
+    def test_delegation_token_lifecycle(self):
+        self.kafka.start()
+        self.delegation_tokens = DelegationTokens(self.kafka, self.test_context)
+
+        self.generate_delegation_token()
+        self.renew_delegation_token()
+        self.produce_with_delegation_token()
+        wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30,
+                   err_msg="Expected producer to still be producing.")
+        assert 1 == self.producer.num_acked, "number of acked messages: %d" % self.producer.num_acked
+
+        self.consume_with_delegation_token()
+        num_consumed = len(self.consumer.messages_consumed[1])
+        assert 1 == num_consumed, "number of consumed messages: %d" % num_consumed
+
+        self.expire_delegation_token()
+
+        self.produce_with_delegation_token()
+        assert 0 == self.producer.num_acked, "number of acked messages: %d" % self.producer.num_acked
\ No newline at end of file


Mime
View raw message