From commits-return-10821-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Mon Dec 3 06:58:55 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BDB51180645 for ; Mon, 3 Dec 2018 06:58:54 +0100 (CET) Received: (qmail 72826 invoked by uid 500); 3 Dec 2018 05:58:53 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 72817 invoked by uid 99); 3 Dec 2018 05:58:53 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Dec 2018 05:58:53 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 262AB804DE; Mon, 3 Dec 2018 05:58:53 +0000 (UTC) Date: Mon, 03 Dec 2018 05:58:52 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-4544: Add system tests for delegation token based authentication MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154381673144.23639.18139760942405478446@gitbox.apache.org> From: manikumar@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: c4822648ef1bb0cd6825d8dbe465c8a5a26a76bc X-Git-Newrev: e7ce0e7e0a82bb56b1106479c0c8a7984fd92604 X-Git-Rev: e7ce0e7e0a82bb56b1106479c0c8a7984fd92604 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e7ce0e7 KAFKA-4544: Add system tests for delegation token based authentication e7ce0e7 is described below commit e7ce0e7e0a82bb56b1106479c0c8a7984fd92604 Author: Attila Sasvari AuthorDate: Mon Dec 3 11:28:36 2018 +0530 KAFKA-4544: Add system tests for delegation token based authentication This change adds some basic system tests for delegation token based authentication: - basic delegation token creation - producing with a delegation token - consuming with a delegation token - expiring a delegation token - producing with an expired delegation token New files: - delegation_tokens.py: a wrapper around kafka-delegation-tokens.sh - executed in container where a secure Broker is running (taking advantage of automatic cleanup) - delegation_tokens_test.py: basic test to validate the lifecycle of a delegation token Changes were made in the following file to extend their functionality: - config_property was updated to be able to configure Kafka brokers with delegation token related settings - jaas.conf template because a broker needs to support multiple login modules when delegation tokens are used - consule-consumer and verifiable_producer to override KAFKA_OPTS (to specify custom jaas.conf) and the client properties (to authenticate with delegation token). Author: Attila Sasvari Reviewers: Reviewers: Viktor Somogyi , Andras Katona <41361962+akatona84@users.noreply.github.com>, Manikumar Reddy Closes #5660 from asasvari/KAFKA-4544 --- tests/kafkatest/services/console_consumer.py | 31 ++++- tests/kafkatest/services/delegation_tokens.py | 102 ++++++++++++++++ tests/kafkatest/services/kafka/config_property.py | 6 + .../services/security/templates/jaas.conf | 6 +- tests/kafkatest/services/verifiable_producer.py | 41 ++++--- .../kafkatest/tests/core/delegation_token_test.py | 130 +++++++++++++++++++++ 6 files changed, 294 insertions(+), 22 deletions(-) diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 65c9fa5..dfbec9f 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -60,8 +60,9 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True, message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=DEV_BRANCH, client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None, - enable_systest_events=False, stop_timeout_sec=30, print_timestamp=False, - isolation_level="read_uncommitted", jaas_override_variables=None): + enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False, + isolation_level="read_uncommitted", jaas_override_variables=None, + kafka_opts_override="", client_prop_file_override=""): """ Args: context: standard context @@ -83,7 +84,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) print_timestamp if True, print each message's timestamp as well isolation_level How to handle transactional messages. jaas_override_variables A dict of variables to be used in the jaas.conf template file - + kafka_opts_override Override parameters of the KAFKA_OPTS environment variable + client_prop_file_override Override client.properties file used by the consumer """ JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), root=ConsoleConsumer.PERSISTENT_ROOT) @@ -116,6 +118,9 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) self.print_timestamp = print_timestamp self.jaas_override_variables = jaas_override_variables or {} + self.kafka_opts_override = kafka_opts_override + self.client_prop_file_override = client_prop_file_override + def prop_file(self, node): """Return a string which can be used to create a configuration file appropriate for the given node.""" @@ -134,6 +139,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) prop_file += str(self.security_config) return prop_file + def start_cmd(self, node): """Return the start command appropriate for the given node.""" args = self.args.copy() @@ -147,14 +153,19 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) args['jmx_port'] = self.jmx_port args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node) args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) - args['kafka_opts'] = self.security_config.kafka_opts + + if self.kafka_opts_override: + args['kafka_opts'] = "\"%s\"" % self.kafka_opts_override + else: + args['kafka_opts'] = self.security_config.kafka_opts cmd = "export JMX_PORT=%(jmx_port)s; " \ "export LOG_DIR=%(log_dir)s; " \ "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \ "export KAFKA_OPTS=%(kafka_opts)s; " \ "%(console_consumer)s " \ - "--topic %(topic)s --consumer.config %(config_file)s" % args + "--topic %(topic)s " \ + "--consumer.config %(config_file)s " % args if self.new_consumer: assert node.version >= V_0_9_0_0, \ @@ -209,7 +220,15 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) # Create and upload config file self.logger.info("console_consumer.properties:") - prop_file = self.prop_file(node) + self.security_config = self.kafka.security_config.client_config(node=node, + jaas_override_variables=self.jaas_override_variables) + self.security_config.setup_node(node) + + if self.client_prop_file_override: + prop_file = self.client_prop_file_override + else: + prop_file = self.prop_file(node) + self.logger.info(prop_file) node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) diff --git a/tests/kafkatest/services/delegation_tokens.py b/tests/kafkatest/services/delegation_tokens.py new file mode 100644 index 0000000..34da16b --- /dev/null +++ b/tests/kafkatest/services/delegation_tokens.py @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os.path +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin + +""" +Delegation tokens is a tool to manage the lifecycle of delegation tokens. +All commands are executed on a secured Kafka node reusing its generated jaas.conf and krb5.conf. +""" + +class DelegationTokens(KafkaPathResolverMixin): + def __init__(self, kafka, context): + self.client_properties_content = """ +security.protocol=SASL_PLAINTEXT +sasl.kerberos.service.name=kafka +""" + self.context = context + self.command_path = self.path.script("kafka-delegation-tokens.sh") + self.kafka_opts = "KAFKA_OPTS=\"-Djava.security.auth.login.config=/mnt/security/jaas.conf " \ + "-Djava.security.krb5.conf=/mnt/security/krb5.conf\" " + self.kafka = kafka + self.bootstrap_server = " --bootstrap-server " + self.kafka.bootstrap_servers('SASL_PLAINTEXT') + self.base_cmd = self.kafka_opts + self.command_path + self.bootstrap_server + self.client_prop_path = os.path.join(self.kafka.PERSISTENT_ROOT, "client.properties") + self.jaas_deleg_conf_path = os.path.join(self.kafka.PERSISTENT_ROOT, "jaas_deleg.conf") + self.token_hmac_path = os.path.join(self.kafka.PERSISTENT_ROOT, "deleg_token_hmac.out") + self.delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT, "delegation_token.out") + self.expire_delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT, "expire_delegation_token.out") + self.renew_delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT, "renew_delegation_token.out") + + self.node = self.kafka.nodes[0] + + def generate_delegation_token(self, maxlifetimeperiod=-1): + self.node.account.create_file(self.client_prop_path, self.client_properties_content) + + cmd = self.base_cmd + " --create" \ + " --max-life-time-period %s" \ + " --command-config %s > %s" % (maxlifetimeperiod, self.client_prop_path, self.delegation_token_out) + self.node.account.ssh(cmd, allow_fail=False) + + def expire_delegation_token(self, hmac): + cmd = self.base_cmd + " --expire" \ + " --expiry-time-period -1" \ + " --hmac %s" \ + " --command-config %s > %s" % (hmac, self.client_prop_path, self.expire_delegation_token_out) + self.node.account.ssh(cmd, allow_fail=False) + + def renew_delegation_token(self, hmac, renew_time_period=-1): + cmd = self.base_cmd + " --renew" \ + " --renew-time-period %s" \ + " --hmac %s" \ + " --command-config %s > %s" \ + % (renew_time_period, hmac, self.client_prop_path, self.renew_delegation_token_out) + return self.node.account.ssh_capture(cmd, allow_fail=False) + + def create_jaas_conf_with_delegation_token(self): + dt = self.parse_delegation_token_out() + jaas_deleg_content = """ +KafkaClient { + org.apache.kafka.common.security.scram.ScramLoginModule required + username="%s" + password="%s" + tokenauth=true; +}; +""" % (dt["tokenid"], dt["hmac"]) + self.node.account.create_file(self.jaas_deleg_conf_path, jaas_deleg_content) + + return jaas_deleg_content + + def token_hmac(self): + dt = self.parse_delegation_token_out() + return dt["hmac"] + + def parse_delegation_token_out(self): + cmd = "tail -1 %s" % self.delegation_token_out + + output_iter = self.node.account.ssh_capture(cmd, allow_fail=False) + output = "" + for line in output_iter: + output += line + + tokenid, hmac, owner, renewers, issuedate, expirydate, maxdate = output.split() + return {"tokenid" : tokenid, + "hmac" : hmac, + "owner" : owner, + "renewers" : renewers, + "issuedate" : issuedate, + "expirydate" :expirydate, + "maxdate" : maxdate} \ No newline at end of file diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py index 621b8e5..853bd83 100644 --- a/tests/kafkatest/services/kafka/config_property.py +++ b/tests/kafkatest/services/kafka/config_property.py @@ -50,6 +50,12 @@ REPLICA_HIGHWATERMARK_CHECKPOINT_INTERVAL_MS = "replica.high.watermark.checkpoin LOG_ROLL_TIME_MS = "log.roll.ms" OFFSETS_TOPIC_NUM_PARTITIONS = "offsets.topic.num.partitions" +DELEGATION_TOKEN_MAX_LIFETIME_MS="delegation.token.max.lifetime.ms" +DELEGATION_TOKEN_EXPIRY_TIME_MS="delegation.token.expiry.time.ms" +DELEGATION_TOKEN_MASTER_KEY="delegation.token.master.key" +SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms" + + """ From KafkaConfig.scala diff --git a/tests/kafkatest/services/security/templates/jaas.conf b/tests/kafkatest/services/security/templates/jaas.conf index e251145..3d6c93e 100644 --- a/tests/kafkatest/services/security/templates/jaas.conf +++ b/tests/kafkatest/services/security/templates/jaas.conf @@ -15,7 +15,7 @@ {% if static_jaas_conf %} KafkaClient { {% endif %} -{% if client_sasl_mechanism == "GSSAPI" %} +{% if "GSSAPI" in client_sasl_mechanism %} {% if is_ibm_jdk %} com.ibm.security.auth.module.Krb5LoginModule required debug=false credsType=both @@ -33,7 +33,7 @@ KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret"; -{% elif client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == "SCRAM-SHA-512" %} +{% elif "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in client_sasl_mechanism %} org.apache.kafka.common.security.scram.ScramLoginModule required username="{{ SecurityConfig.SCRAM_CLIENT_USER }}" password="{{ SecurityConfig.SCRAM_CLIENT_PASSWORD }}"; @@ -65,7 +65,7 @@ KafkaServer { user_client="client-secret" user_kafka="kafka-secret"; {% endif %} -{% if client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == "SCRAM-SHA-512" %} +{% if "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in client_sasl_mechanism %} org.apache.kafka.common.security.scram.ScramLoginModule required username="{{ SecurityConfig.SCRAM_BROKER_USER }}" password="{{ SecurityConfig.SCRAM_BROKER_PASSWORD }}"; diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 7fa2654..f339a62 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -58,18 +58,21 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None, stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", enable_idempotence=False, offline_nodes=[], create_time=-1, repeating_keys=None, - jaas_override_variables=None): + jaas_override_variables=None, kafka_opts_override="", client_prop_file_override=""): """ - :param max_messages is a number of messages to be produced per producer - :param message_validator checks for an expected format of messages produced. There are - currently two: - * is_int is an integer format; this is default and expected to be used if - num_nodes = 1 - * is_int_with_prefix recommended if num_nodes > 1, because otherwise each producer - will produce exactly same messages, and validation may miss missing messages. - :param compression_types: If None, all producers will not use compression; or a list of - compression types, one per producer (could be "none"). - :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file + Args: + :param max_messages number of messages to be produced per producer + :param message_validator checks for an expected format of messages produced. There are + currently two: + * is_int is an integer format; this is default and expected to be used if + num_nodes = 1 + * is_int_with_prefix recommended if num_nodes > 1, because otherwise each producer + will produce exactly same messages, and validation may miss missing messages. + :param compression_types If None, all producers will not use compression; or a list of compression types, + one per producer (could be "none"). + :param jaas_override_variables A dict of variables to be used in the jaas.conf template file + :param kafka_opts_override Override parameters of the KAFKA_OPTS environment variable + :param client_prop_file_override Override client.properties file used by the consumer """ super(VerifiableProducer, self).__init__(context, num_nodes) self.log_level = log_level @@ -97,6 +100,9 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou self.create_time = create_time self.repeating_keys = repeating_keys self.jaas_override_variables = jaas_override_variables or {} + self.kafka_opts_override = kafka_opts_override + self.client_prop_file_override = client_prop_file_override + def java_class_name(self): return "VerifiableProducer" @@ -125,7 +131,11 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou self.security_config.setup_node(node) # Create and upload config file - producer_prop_file = self.prop_file(node) + if self.client_prop_file_override: + producer_prop_file = self.client_prop_file_override + else: + producer_prop_file = self.prop_file(node) + if self.acks is not None: self.logger.info("VerifiableProducer (index = %d) will use acks = %s", idx, self.acks) producer_prop_file += "\nacks=%s\n" % self.acks @@ -189,7 +199,11 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou def start_cmd(self, node, idx): cmd = "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR - cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts + if self.kafka_opts_override: + cmd += " export KAFKA_OPTS=\"%s\";" % self.kafka_opts_override + else: + cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts + cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG cmd += self.impl.exec_cmd(node) cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol, True, self.offline_nodes)) @@ -207,6 +221,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou cmd += " --repeating-keys %s " % str(self.repeating_keys) cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE + cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) return cmd diff --git a/tests/kafkatest/tests/core/delegation_token_test.py b/tests/kafkatest/tests/core/delegation_token_test.py new file mode 100644 index 0000000..0b2b6eb --- /dev/null +++ b/tests/kafkatest/tests/core/delegation_token_test.py @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until +from kafkatest.services.kafka import config_property, KafkaService +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.delegation_tokens import DelegationTokens +from kafkatest.services.verifiable_producer import VerifiableProducer + +from datetime import datetime +import time + +""" +Basic tests to validate delegation token support +""" +class DelegationTokenTest(Test): + def __init__(self, test_context): + super(DelegationTokenTest, self).__init__(test_context) + + self.test_context = test_context + self.topic = "topic" + self.zk = ZookeeperService(test_context, num_nodes=1) + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka", + topics={self.topic: {"partitions": 1, "replication-factor": 1}}, + server_prop_overides=[ + [config_property.DELEGATION_TOKEN_MAX_LIFETIME_MS, "604800000"], + [config_property.DELEGATION_TOKEN_EXPIRY_TIME_MS, "86400000"], + [config_property.DELEGATION_TOKEN_MASTER_KEY, "test12345"], + [config_property.SASL_ENABLED_MECHANISMS, "GSSAPI,SCRAM-SHA-256"] + ]) + self.jaas_deleg_conf_path = "/tmp/jaas_deleg.conf" + self.jaas_deleg_conf = "" + self.client_properties_content = """ +security.protocol=SASL_PLAINTEXT +sasl.mechanism=SCRAM-SHA-256 +sasl.kerberos.service.name=kafka +client.id=console-consumer +""" + self.client_kafka_opts=' -Djava.security.auth.login.config=' + self.jaas_deleg_conf_path + + self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, max_messages=1, + throughput=1, kafka_opts_override=self.client_kafka_opts, + client_prop_file_override=self.client_properties_content) + + self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, + kafka_opts_override=self.client_kafka_opts, + client_prop_file_override=self.client_properties_content) + + self.kafka.security_protocol = 'SASL_PLAINTEXT' + self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256' + self.kafka.interbroker_sasl_mechanism = 'GSSAPI' + + + def setUp(self): + self.zk.start() + + def tearDown(self): + self.producer.nodes[0].account.remove(self.jaas_deleg_conf_path) + self.consumer.nodes[0].account.remove(self.jaas_deleg_conf_path) + + def generate_delegation_token(self): + self.logger.debug("Request delegation token") + self.delegation_tokens.generate_delegation_token() + self.jaas_deleg_conf = self.delegation_tokens.create_jaas_conf_with_delegation_token() + + def expire_delegation_token(self): + self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256' + token_hmac = self.delegation_tokens.token_hmac() + self.delegation_tokens.expire_delegation_token(token_hmac) + + + def produce_with_delegation_token(self): + self.producer.acked_values = [] + self.producer.nodes[0].account.create_file(self.jaas_deleg_conf_path, self.jaas_deleg_conf) + self.logger.debug(self.jaas_deleg_conf) + self.producer.start() + + def consume_with_delegation_token(self): + self.logger.debug("Consume messages with delegation token") + + self.consumer.nodes[0].account.create_file(self.jaas_deleg_conf_path, self.jaas_deleg_conf) + self.logger.debug(self.jaas_deleg_conf) + self.consumer.consumer_timeout_ms = 5000 + + self.consumer.start() + self.consumer.wait() + + def get_datetime_ms(self, input_date): + return int(time.mktime(datetime.strptime(input_date,"%Y-%m-%dT%H:%M").timetuple()) * 1000) + + def renew_delegation_token(self): + dt = self.delegation_tokens.parse_delegation_token_out() + orig_expiry_date_ms = self.get_datetime_ms(dt["expirydate"]) + new_expirydate_ms = orig_expiry_date_ms + 1000 + + self.delegation_tokens.renew_delegation_token(dt["hmac"], new_expirydate_ms) + + def test_delegation_token_lifecycle(self): + self.kafka.start() + self.delegation_tokens = DelegationTokens(self.kafka, self.test_context) + + self.generate_delegation_token() + self.renew_delegation_token() + self.produce_with_delegation_token() + wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30, + err_msg="Expected producer to still be producing.") + assert 1 == self.producer.num_acked, "number of acked messages: %d" % self.producer.num_acked + + self.consume_with_delegation_token() + num_consumed = len(self.consumer.messages_consumed[1]) + assert 1 == num_consumed, "number of consumed messages: %d" % num_consumed + + self.expire_delegation_token() + + self.produce_with_delegation_token() + assert 0 == self.producer.num_acked, "number of acked messages: %d" % self.producer.num_acked \ No newline at end of file