kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4548; Add ClientCompatibilityTest to verify features against older brokers
Date Tue, 24 Jan 2017 20:07:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7837d3e54 -> 3df60e7eb


KAFKA-4548; Add ClientCompatibilityTest to verify features against older brokers

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2262 from cmccabe/KAFKA-4548


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3df60e7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3df60e7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3df60e7e

Branch: refs/heads/trunk
Commit: 3df60e7eb8b6fb38cd560708f5b909a9b2e61de2
Parents: 7837d3e
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Tue Jan 24 20:06:35 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Jan 24 20:07:13 2017 +0000

----------------------------------------------------------------------
 .../client_compatibility_features_test.py       | 100 ++++++
 ...client_compatibility_produce_consume_test.py |  75 ++++
 .../client/test_producer_consumer_compat.py     |  75 ----
 .../kafka/tools/ClientCompatibilityTest.java    | 359 +++++++++++++++++++
 4 files changed, 534 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3df60e7e/tests/kafkatest/tests/client/client_compatibility_features_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py
new file mode 100644
index 0000000..3c96b84
--- /dev/null
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import time
+from random import randint
+
+from ducktape.mark import parametrize
+from ducktape.tests.test import TestContext
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from ducktape.tests.test import Test
+from kafkatest.version import TRUNK, LATEST_0_10_0, LATEST_0_10_1, V_0_10_1_0, KafkaVersion
+
+def get_broker_features(broker_version):
+    features = {}
+    if (broker_version < V_0_10_1_0):
+        features["offsets-for-times-supported"] = False
+        features["cluster-id-supported"] = False
+    else:
+        features["offsets-for-times-supported"] = True
+        features["cluster-id-supported"] = True
+    return features
+
+def run_command(node, cmd, ssh_log_file):
+    with open(ssh_log_file, 'w') as f:
+        f.write("Running %s\n" % cmd)
+        try:
+            for line in node.account.ssh_capture(cmd):
+                f.write(line)
+        except Exception as e:
+            f.write("** Command failed!")
+            print e
+            raise e
+
+
+class ClientCompatibilityFeaturesTest(Test):
+    """
+    Tests clients for the presence or absence of specific features when communicating with
brokers with various
+    versions. Relies on ClientCompatibilityTest.java for much of the functionality.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ClientCompatibilityFeaturesTest, self).__init__(test_context=test_context)
+
+        self.zk = ZookeeperService(test_context, num_nodes=3)
+
+        # Generate a unique topic name
+        topic_name = "client_compat_features_topic_%d%d" % (int(time.time()), randint(0,
2147483647))
+        self.topics = { topic_name: {
+            "partitions": 1, # Use only one partition to avoid worrying about ordering
+            "replication-factor": 3
+            }}
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics=self.topics)
+
+    def invoke_compatibility_program(self, features):
+        # Run the compatibility test on the first Kafka node.
+        node = self.zk.nodes[0]
+        cmd = ("%s org.apache.kafka.tools.ClientCompatibilityTest "
+               "--bootstrap-server %s "
+               "--offsets-for-times-supported %s "
+               "--cluster-id-supported %s "
+               "--topic %s " % (self.zk.path.script("kafka-run-class.sh", node),
+                               self.kafka.bootstrap_servers(),
+                               features["offsets-for-times-supported"],
+                               features["cluster-id-supported"],
+                               self.topics.keys()[0]))
+        results_dir = TestContext.results_dir(self.test_context, 0)
+        os.makedirs(results_dir)
+        ssh_log_file = "%s/%s" % (results_dir, "client_compatibility_test_output.txt")
+        try:
+          self.logger.info("Running %s" % cmd)
+          run_command(node, cmd, ssh_log_file)
+        except Exception as e:
+          self.logger.info("** Command failed.  See %s for log messages." % ssh_log_file)
+          raise e
+
+    @parametrize(broker_version=str(TRUNK))
+    @parametrize(broker_version=str(LATEST_0_10_0))
+    @parametrize(broker_version=str(LATEST_0_10_1))
+    def run_compatibility_test(self, broker_version):
+        self.zk.start()
+        self.kafka.set_version(KafkaVersion(broker_version))
+        self.kafka.start()
+        features = get_broker_features(broker_version)
+        self.invoke_compatibility_program(features)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3df60e7e/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
new file mode 100644
index 0000000..35b904b
--- /dev/null
+++ b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int_with_prefix
+from kafkatest.version import TRUNK, LATEST_0_10_0, LATEST_0_10_1, KafkaVersion
+
+class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
+    """
+    These tests validate that we can use a new client to produce and consume from older brokers.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ClientCompatibilityProduceConsumeTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic:{
+                                                                    "partitions": 10,
+                                                                    "replication-factor":
2}})
+        self.num_partitions = 10
+        self.timeout_sec = 60
+        self.producer_throughput = 1000
+        self.num_producers = 2
+        self.messages_per_producer = 1000
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        # Override this since we're adding services outside of the constructor
+        return super(ClientCompatibilityProduceConsumeTest, self).min_cluster_size() + self.num_producers
+ self.num_consumers
+
+    @parametrize(broker_version=str(TRUNK))
+    @parametrize(broker_version=str(LATEST_0_10_0))
+    @parametrize(broker_version=str(LATEST_0_10_1))
+    def test_produce_consume(self, broker_version):
+        print("running producer_consumer_compat with broker_version = %s" % broker_version)
+        self.kafka.set_version(KafkaVersion(broker_version))
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int_with_prefix)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic,
+                                        consumer_timeout_ms=60000,
+                                        message_validator=is_int_with_prefix)
+        self.kafka.start()
+
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/3df60e7e/tests/kafkatest/tests/client/test_producer_consumer_compat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/test_producer_consumer_compat.py b/tests/kafkatest/tests/client/test_producer_consumer_compat.py
deleted file mode 100644
index 5a14fc5..0000000
--- a/tests/kafkatest/tests/client/test_producer_consumer_compat.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int_with_prefix
-from kafkatest.version import TRUNK, V_0_10_0_0, KafkaVersion
-
-class TestProducerConsumerCompat(ProduceConsumeValidateTest):
-    """
-    These tests validate that we can use a new client to consume from older
-    brokers.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(TestProducerConsumerCompat, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=3)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic:{
-                                                                    "partitions": 10,
-                                                                    "replication-factor":
2}})
-        self.num_partitions = 10
-        self.timeout_sec = 60
-        self.producer_throughput = 1000
-        self.num_producers = 2
-        self.messages_per_producer = 1000
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        # Override this since we're adding services outside of the constructor
-        return super(TestProducerConsumerCompat, self).min_cluster_size() + self.num_producers
+ self.num_consumers
-
-    # TODO: when KAFKA-4462 is fully implemented, we should test other versions here.
-    @parametrize(broker_version=str(TRUNK))
-    def test_produce_consume(self, broker_version):
-        print("running producer_consumer_compat with broker_version = %s" % broker_version)
-        self.kafka.set_version(KafkaVersion(broker_version))
-        self.kafka.security_protocol = "PLAINTEXT"
-        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic, throughput=self.producer_throughput,
-                                           message_validator=is_int_with_prefix)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic,
-                                        consumer_timeout_ms=60000,
-                                        message_validator=is_int_with_prefix)
-        self.kafka.start()
-
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/3df60e7e/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
new file mode 100644
index 0000000..69202a0
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ObsoleteBrokerException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+/**
+ * ClientCompatibilityTest is invoked by the ducktape test client_compatibility_features_test.py
to validate
+ * client behavior when various broker versions are in use.  It runs various client operations
and tests whether they
+ * are supported or not.
+ */
+public class ClientCompatibilityTest {
+    private static final Logger log = LoggerFactory.getLogger(ClientCompatibilityTest.class);
+
+    static class TestConfig {
+        final String bootstrapServer;
+        final String topic;
+        final boolean offsetsForTimesSupported;
+        final boolean expectClusterId;
+
+        TestConfig(Namespace res) {
+            this.bootstrapServer = res.getString("bootstrapServer");
+            this.topic = res.getString("topic");
+            this.offsetsForTimesSupported = res.getBoolean("offsetsForTimesSupported");
+            this.expectClusterId = res.getBoolean("clusterIdSupported");
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        ArgumentParser parser = ArgumentParsers
+            .newArgumentParser("client-compatibility-test")
+            .defaultHelp(true)
+            .description("This tool is used to verify client compatibility guarantees.");
+        parser.addArgument("--topic")
+            .action(store())
+            .required(true)
+            .type(String.class)
+            .dest("topic")
+            .metavar("TOPIC")
+            .help("the compatibility test will produce messages to this topic");
+        parser.addArgument("--bootstrap-server")
+            .action(store())
+            .required(true)
+            .type(String.class)
+            .dest("bootstrapServer")
+            .metavar("BOOTSTRAP_SERVER")
+            .help("The server(s) to use for bootstrapping");
+        parser.addArgument("--offsets-for-times-supported")
+            .action(store())
+            .required(true)
+            .type(Boolean.class)
+            .dest("offsetsForTimesSupported")
+            .metavar("OFFSETS_FOR_TIMES_SUPPORTED")
+            .help("True if KafkaConsumer#offsetsForTimes is supported by the current broker
version");
+        parser.addArgument("--cluster-id-supported")
+            .action(store())
+            .required(true)
+            .type(Boolean.class)
+            .dest("clusterIdSupported")
+            .metavar("CLUSTER_ID_SUPPORTED")
+            .help("True if cluster IDs are supported.  False if cluster ID always appears
as null.");
+        Namespace res = null;
+        try {
+            res = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                System.exit(0);
+            } else {
+                parser.handleError(e);
+                System.exit(1);
+            }
+        }
+        TestConfig testConfig = new TestConfig(res);
+        ClientCompatibilityTest test = new ClientCompatibilityTest(testConfig);
+        try {
+            test.run();
+        } catch (Throwable t) {
+            System.out.printf("FAILED: Caught exception %s\n\n", t.getMessage());
+            t.printStackTrace();
+            System.exit(1);
+        }
+        System.out.println("SUCCESS.");
+        System.exit(0);
+    }
+
+    private static byte[] asByteArray(long a) {
+        ByteBuffer buf = ByteBuffer.allocate(8);
+        buf.putLong(a);
+        return buf.array();
+    }
+
+    private static byte[] asByteArray(long a, long b) {
+        ByteBuffer buf = ByteBuffer.allocate(16);
+        buf.putLong(a);
+        buf.putLong(b);
+        return buf.array();
+    }
+
+    private static String toHexString(byte[] buf) {
+        StringBuilder bld = new StringBuilder();
+        for (byte b : buf) {
+            bld.append(String.format("%02x", b));
+        }
+        return bld.toString();
+    }
+
+    private static void compareArrays(byte[] a, byte[] b) {
+        if (!Arrays.equals(a, b)) {
+            throw new RuntimeException("Arrays did not match: expected " + toHexString(a)
+ ", got " + toHexString(b));
+        }
+    }
+
+    private final TestConfig testConfig;
+
+    private final byte[] message1;
+
+    private final byte[] message2;
+
+    ClientCompatibilityTest(TestConfig testConfig) {
+        this.testConfig = testConfig;
+        long curTime = Time.SYSTEM.milliseconds();
+        this.message1 = asByteArray(curTime);
+        this.message2 = asByteArray(curTime, curTime);
+    }
+
+    void run() throws Exception {
+        long prodTimeMs = Time.SYSTEM.milliseconds();
+        testProduce();
+        testConsume(prodTimeMs);
+    }
+
+    public void testProduce() throws Exception {
+        Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
+        ByteArraySerializer serializer = new ByteArraySerializer();
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps,
serializer, serializer);
+        ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(testConfig.topic,
message1);
+        Future<RecordMetadata> future1 = producer.send(record1);
+        ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(testConfig.topic,
message2);
+        Future<RecordMetadata> future2 = producer.send(record2);
+        producer.flush();
+        future1.get();
+        future2.get();
+        producer.close();
+    }
+
+    private static class OffsetsForTime {
+        Map<TopicPartition, OffsetAndTimestamp> result;
+
+        @Override
+        public String toString() {
+            return Utils.mkString(result);
+        }
+    }
+
+    public static class ClientCompatibilityTestDeserializer implements Deserializer<byte[]>,
ClusterResourceListener {
+        private final boolean expectClusterId;
+
+        ClientCompatibilityTestDeserializer(boolean expectClusterId) {
+            this.expectClusterId = expectClusterId;
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            // nothing to do
+        }
+
+        @Override
+        public byte[] deserialize(String topic, byte[] data) {
+            return data;
+        }
+
+        @Override
+        public void close() {
+            // nothing to do
+        }
+
+        @Override
+        public void onUpdate(ClusterResource clusterResource) {
+            if (expectClusterId) {
+                if (clusterResource.clusterId() == null) {
+                    throw new RuntimeException("Expected cluster id to be supported, but
it was null.");
+                }
+            } else {
+                if (clusterResource.clusterId() != null) {
+                    throw new RuntimeException("Expected cluster id to be null, but it was
supported.");
+                }
+            }
+        }
+    }
+
+    public void testConsume(final long prodTimeMs) throws Exception {
+        Properties consumerProps = new Properties();
+        consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
+        ClientCompatibilityTestDeserializer deserializer =
+            new ClientCompatibilityTestDeserializer(testConfig.expectClusterId);
+        final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps,
deserializer, deserializer);
+        final List<PartitionInfo> partitionInfos = consumer.partitionsFor(testConfig.topic);
+        if (partitionInfos.size() < 1)
+            throw new RuntimeException("Expected at least one partition for topic " + testConfig.topic);
+        final Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
+        final LinkedList<TopicPartition> topicPartitions = new LinkedList<>();
+        for (PartitionInfo partitionInfo : partitionInfos) {
+            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
+            timestampsToSearch.put(topicPartition, prodTimeMs);
+            topicPartitions.add(topicPartition);
+        }
+        final OffsetsForTime offsetsForTime = new OffsetsForTime();
+        tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
+                    }
+                },
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        log.info("offsetsForTime = {}", offsetsForTime.result);
+                    }
+                });
+        // Whether or not offsetsForTimes works, beginningOffsets and endOffsets
+        // should work.
+        consumer.beginningOffsets(timestampsToSearch.keySet());
+        consumer.endOffsets(timestampsToSearch.keySet());
+
+        consumer.assign(topicPartitions);
+        consumer.seekToBeginning(topicPartitions);
+        final Iterator<byte[]> iter = new Iterator<byte[]>() {
+            private final int timeoutMs = 10000;
+            private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = null;
+            private byte[] next = null;
+
+            private byte[] fetchNext() {
+                while (true) {
+                    long curTime = Time.SYSTEM.milliseconds();
+                    if (curTime - prodTimeMs > timeoutMs)
+                        throw new RuntimeException("Timed out after " + timeoutMs + " ms.");
+                    if (recordIter == null) {
+                        ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+                        recordIter = records.iterator();
+                    }
+                    if (recordIter.hasNext())
+                        return recordIter.next().value();
+                    recordIter = null;
+                }
+            }
+
+            @Override
+            public boolean hasNext() {
+                if (next != null)
+                    return true;
+                next = fetchNext();
+                return next != null;
+            }
+
+            @Override
+            public byte[] next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+                byte[] cur = next;
+                next = null;
+                return cur;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+
+        byte[] next = iter.next();
+        try {
+            compareArrays(message1, next);
+            log.debug("Found first message...");
+        } catch (RuntimeException e) {
+            throw new RuntimeException("The first message in this topic was not ours. Please
use a new topic when " +
+                    "running this program.");
+        }
+        next = iter.next();
+        try {
+            compareArrays(message2, next);
+            log.debug("Found second message...");
+        } catch (RuntimeException e) {
+            throw new RuntimeException("The second message in this topic was not ours. Please
use a new topic when " +
+                    "running this program.");
+        }
+        log.debug("Closing consumer.");
+        consumer.close();
+        log.info("Closed consumer.");
+    }
+
+    private void tryFeature(String featureName, boolean supported, Runnable invoker, Runnable
resultTester) {
+        try {
+            invoker.run();
+            log.info("Successfully used feature {}", featureName);
+        } catch (ObsoleteBrokerException e) {
+            log.info("Got ObsoleteBrokerException when attempting to use feature {}", featureName);
+            if (supported) {
+                throw new RuntimeException("Expected " + featureName + " to be supported,
but it wasn't.", e);
+            }
+            return;
+        }
+        if (!supported) {
+            throw new RuntimeException("Did not expect " + featureName + " to be supported,
but it was.");
+        }
+        resultTester.run();
+    }
+}


Mime
View raw message