eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [3/7] incubator-eagle git commit: EAGLE-120 EAGLE-100 initial system and hadoop metric initial system and hadoop metric https://issues.apache.org/jira/browse/EAGLE-120 Author: qingwen220 qingwzhao@ebay.com Reviewer: yonzhang2012 yonzhang2012@apache.org C
Date Wed, 13 Jan 2016 01:08:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer_integration.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer_integration.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer_integration.py
new file mode 100644
index 0000000..4723220
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer_integration.py
@@ -0,0 +1,401 @@
+import logging
+import os
+
+from six.moves import xrange
+
+from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
+from kafka.common import (
+    ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
+)
+from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import (
+    KafkaIntegrationTestCase, kafka_versions, random_string, Timer
+)
+
+class TestConsumerIntegration(KafkaIntegrationTestCase):
+    @classmethod
+    def setUpClass(cls):
+        if not os.environ.get('KAFKA_VERSION'):
+            return
+
+        cls.zk = ZookeeperFixture.instance()
+        cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
+        cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
+
+        cls.server = cls.server1 # Bootstrapping server
+
+    @classmethod
+    def tearDownClass(cls):
+        if not os.environ.get('KAFKA_VERSION'):
+            return
+
+        cls.server1.close()
+        cls.server2.close()
+        cls.zk.close()
+
+    def send_messages(self, partition, messages):
+        messages = [ create_message(self.msg(str(msg))) for msg in messages ]
+        produce = ProduceRequest(self.topic, partition, messages = messages)
+        resp, = self.client.send_produce_request([produce])
+        self.assertEqual(resp.error, 0)
+
+        return [ x.value for x in messages ]
+
+    def assert_message_count(self, messages, num_messages):
+        # Make sure we got them all
+        self.assertEqual(len(messages), num_messages)
+
+        # Make sure there are no duplicates
+        self.assertEqual(len(set(messages)), num_messages)
+
+    def consumer(self, **kwargs):
+        if os.environ['KAFKA_VERSION'] == "0.8.0":
+            # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
+            kwargs['auto_commit'] = False
+        else:
+            kwargs.setdefault('auto_commit', True)
+
+        consumer_class = kwargs.pop('consumer', SimpleConsumer)
+        group = kwargs.pop('group', self.id().encode('utf-8'))
+        topic = kwargs.pop('topic', self.topic)
+
+        if consumer_class == SimpleConsumer:
+            kwargs.setdefault('iter_timeout', 0)
+
+        return consumer_class(self.client, group, topic, **kwargs)
+
+    def kafka_consumer(self, **configs):
+        brokers = '%s:%d' % (self.server.host, self.server.port)
+        consumer = KafkaConsumer(self.topic,
+                                 metadata_broker_list=brokers,
+                                 **configs)
+        return consumer
+
+    @kafka_versions("all")
+    def test_simple_consumer(self):
+        self.send_messages(0, range(0, 100))
+        self.send_messages(1, range(100, 200))
+
+        # Start a consumer
+        consumer = self.consumer()
+
+        self.assert_message_count([ message for message in consumer ], 200)
+
+        consumer.stop()
+
+    @kafka_versions("all")
+    def test_simple_consumer__seek(self):
+        self.send_messages(0, range(0, 100))
+        self.send_messages(1, range(100, 200))
+
+        consumer = self.consumer()
+
+        # Rewind 10 messages from the end
+        consumer.seek(-10, 2)
+        self.assert_message_count([ message for message in consumer ], 10)
+
+        # Rewind 13 messages from the end
+        consumer.seek(-13, 2)
+        self.assert_message_count([ message for message in consumer ], 13)
+
+        consumer.stop()
+
+    @kafka_versions("all")
+    def test_simple_consumer_blocking(self):
+        consumer = self.consumer()
+
+        # Ask for 5 messages, nothing in queue, block 5 seconds
+        with Timer() as t:
+            messages = consumer.get_messages(block=True, timeout=5)
+            self.assert_message_count(messages, 0)
+        self.assertGreaterEqual(t.interval, 5)
+
+        self.send_messages(0, range(0, 10))
+
+        # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
+        with Timer() as t:
+            messages = consumer.get_messages(count=5, block=True, timeout=5)
+            self.assert_message_count(messages, 5)
+        self.assertLessEqual(t.interval, 1)
+
+        # Ask for 10 messages, get 5 back, block 5 seconds
+        with Timer() as t:
+            messages = consumer.get_messages(count=10, block=True, timeout=5)
+            self.assert_message_count(messages, 5)
+        self.assertGreaterEqual(t.interval, 5)
+
+        consumer.stop()
+
+    @kafka_versions("all")
+    def test_simple_consumer_pending(self):
+        # make sure that we start with no pending messages
+        consumer = self.consumer()
+        self.assertEquals(consumer.pending(), 0)
+        self.assertEquals(consumer.pending(partitions=[0]), 0)
+        self.assertEquals(consumer.pending(partitions=[1]), 0)
+
+        # Produce 10 messages to partitions 0 and 1
+        self.send_messages(0, range(0, 10))
+        self.send_messages(1, range(10, 20))
+
+        consumer = self.consumer()
+
+        self.assertEqual(consumer.pending(), 20)
+        self.assertEqual(consumer.pending(partitions=[0]), 10)
+        self.assertEqual(consumer.pending(partitions=[1]), 10)
+
+        # move to last message, so one partition should have 1 pending
+        # message and other 0
+        consumer.seek(-1, 2)
+        self.assertEqual(consumer.pending(), 1)
+
+        pending_part1 = consumer.pending(partitions=[0])
+        pending_part2 = consumer.pending(partitions=[1])
+        self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
+        consumer.stop()
+
+    @kafka_versions("all")
+    def test_multi_process_consumer(self):
+        # Produce 100 messages to partitions 0 and 1
+        self.send_messages(0, range(0, 100))
+        self.send_messages(1, range(100, 200))
+
+        consumer = self.consumer(consumer = MultiProcessConsumer)
+
+        self.assert_message_count([ message for message in consumer ], 200)
+
+        consumer.stop()
+
+    @kafka_versions("all")
+    def test_multi_process_consumer_blocking(self):
+        consumer = self.consumer(consumer = MultiProcessConsumer)
+
+        # Ask for 5 messages, No messages in queue, block 5 seconds
+        with Timer() as t:
+            messages = consumer.get_messages(block=True, timeout=5)
+            self.assert_message_count(messages, 0)
+
+        self.assertGreaterEqual(t.interval, 5)
+
+        # Send 10 messages
+        self.send_messages(0, range(0, 10))
+
+        # Ask for 5 messages, 10 messages in queue, block 0 seconds
+        with Timer() as t:
+            messages = consumer.get_messages(count=5, block=True, timeout=5)
+            self.assert_message_count(messages, 5)
+        self.assertLessEqual(t.interval, 1)
+
+        # Ask for 10 messages, 5 in queue, block 5 seconds
+        with Timer() as t:
+            messages = consumer.get_messages(count=10, block=True, timeout=5)
+            self.assert_message_count(messages, 5)
+        self.assertGreaterEqual(t.interval, 4.95)
+
+        consumer.stop()
+
+    @kafka_versions("all")
+    def test_multi_proc_pending(self):
+        self.send_messages(0, range(0, 10))
+        self.send_messages(1, range(10, 20))
+
+        consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
+
+        self.assertEqual(consumer.pending(), 20)
+        self.assertEqual(consumer.pending(partitions=[0]), 10)
+        self.assertEqual(consumer.pending(partitions=[1]), 10)
+
+        consumer.stop()
+
+    @kafka_versions("all")
+    def test_large_messages(self):
+        # Produce 10 "normal" size messages
+        small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
+
+        # Produce 10 messages that are large (bigger than default fetch size)
+        large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
+
+        # Consumer should still get all of them
+        consumer = self.consumer()
+
+        expected_messages = set(small_messages + large_messages)
+        actual_messages = set([ x.message.value for x in consumer ])
+        self.assertEqual(expected_messages, actual_messages)
+
+        consumer.stop()
+
+    @kafka_versions("all")
+    def test_huge_messages(self):
+        huge_message, = self.send_messages(0, [
+            create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
+        ])
+
+        # Create a consumer with the default buffer size
+        consumer = self.consumer()
+
+        # This consumer failes to get the message
+        with self.assertRaises(ConsumerFetchSizeTooSmall):
+            consumer.get_message(False, 0.1)
+
+        consumer.stop()
+
+        # Create a consumer with no fetch size limit
+        big_consumer = self.consumer(
+            max_buffer_size = None,
+            partitions = [0],
+        )
+
+        # Seek to the last message
+        big_consumer.seek(-1, 2)
+
+        # Consume giant message successfully
+        message = big_consumer.get_message(block=False, timeout=10)
+        self.assertIsNotNone(message)
+        self.assertEqual(message.message.value, huge_message)
+
+        big_consumer.stop()
+
+    @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+    def test_offset_behavior__resuming_behavior(self):
+        self.send_messages(0, range(0, 100))
+        self.send_messages(1, range(100, 200))
+
+        # Start a consumer
+        consumer1 = self.consumer(
+            auto_commit_every_t = None,
+            auto_commit_every_n = 20,
+        )
+
+        # Grab the first 195 messages
+        output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
+        self.assert_message_count(output_msgs1, 195)
+
+        # The total offset across both partitions should be at 180
+        consumer2 = self.consumer(
+            auto_commit_every_t = None,
+            auto_commit_every_n = 20,
+        )
+
+        # 181-200
+        self.assert_message_count([ message for message in consumer2 ], 20)
+
+        consumer1.stop()
+        consumer2.stop()
+
+    # TODO: Make this a unit test -- should not require integration
+    @kafka_versions("all")
+    def test_fetch_buffer_size(self):
+
+        # Test parameters (see issue 135 / PR 136)
+        TEST_MESSAGE_SIZE=1048
+        INIT_BUFFER_SIZE=1024
+        MAX_BUFFER_SIZE=2048
+        assert TEST_MESSAGE_SIZE > INIT_BUFFER_SIZE
+        assert TEST_MESSAGE_SIZE < MAX_BUFFER_SIZE
+        assert MAX_BUFFER_SIZE == 2 * INIT_BUFFER_SIZE
+
+        self.send_messages(0, [ "x" * 1048 ])
+        self.send_messages(1, [ "x" * 1048 ])
+
+        consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
+        messages = [ message for message in consumer ]
+        self.assertEqual(len(messages), 2)
+
+    @kafka_versions("all")
+    def test_kafka_consumer(self):
+        self.send_messages(0, range(0, 100))
+        self.send_messages(1, range(100, 200))
+
+        # Start a consumer
+        consumer = self.kafka_consumer(auto_offset_reset='smallest',
+                                       consumer_timeout_ms=5000)
+        n = 0
+        messages = {0: set(), 1: set()}
+        logging.debug("kafka consumer offsets: %s" % consumer.offsets())
+        for m in consumer:
+            logging.debug("Consumed message %s" % repr(m))
+            n += 1
+            messages[m.partition].add(m.offset)
+            if n >= 200:
+                break
+
+        self.assertEqual(len(messages[0]), 100)
+        self.assertEqual(len(messages[1]), 100)
+
+    @kafka_versions("all")
+    def test_kafka_consumer__blocking(self):
+        TIMEOUT_MS = 500
+        consumer = self.kafka_consumer(auto_offset_reset='smallest',
+                                       consumer_timeout_ms=TIMEOUT_MS)
+
+        # Ask for 5 messages, nothing in queue, block 5 seconds
+        with Timer() as t:
+            with self.assertRaises(ConsumerTimeout):
+                msg = consumer.next()
+        self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
+
+        self.send_messages(0, range(0, 10))
+
+        # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
+        messages = set()
+        with Timer() as t:
+            for i in range(5):
+                msg = consumer.next()
+                messages.add((msg.partition, msg.offset))
+        self.assertEqual(len(messages), 5)
+        self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
+
+        # Ask for 10 messages, get 5 back, block 5 seconds
+        messages = set()
+        with Timer() as t:
+            with self.assertRaises(ConsumerTimeout):
+                for i in range(10):
+                    msg = consumer.next()
+                    messages.add((msg.partition, msg.offset))
+        self.assertEqual(len(messages), 5)
+        self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
+
+    @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+    def test_kafka_consumer__offset_commit_resume(self):
+        GROUP_ID = random_string(10)
+
+        self.send_messages(0, range(0, 100))
+        self.send_messages(1, range(100, 200))
+
+        # Start a consumer
+        consumer1 = self.kafka_consumer(
+            group_id = GROUP_ID,
+            auto_commit_enable = True,
+            auto_commit_interval_ms = None,
+            auto_commit_interval_messages = 20,
+            auto_offset_reset='smallest',
+        )
+
+        # Grab the first 195 messages
+        output_msgs1 = []
+        for _ in xrange(195):
+            m = consumer1.next()
+            output_msgs1.append(m)
+            consumer1.task_done(m)
+        self.assert_message_count(output_msgs1, 195)
+
+        # The total offset across both partitions should be at 180
+        consumer2 = self.kafka_consumer(
+            group_id = GROUP_ID,
+            auto_commit_enable = True,
+            auto_commit_interval_ms = None,
+            auto_commit_interval_messages = 20,
+            consumer_timeout_ms = 100,
+            auto_offset_reset='smallest',
+        )
+
+        # 181-200
+        output_msgs2 = []
+        with self.assertRaises(ConsumerTimeout):
+            while True:
+                m = consumer2.next()
+                output_msgs2.append(m)
+        self.assert_message_count(output_msgs2, 20)
+        self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_context.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_context.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_context.py
new file mode 100644
index 0000000..da9b22f
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_context.py
@@ -0,0 +1,117 @@
+"""
+OffsetCommitContext tests.
+"""
+from . import unittest
+
+from mock import MagicMock, patch
+
+from kafka.common import OffsetOutOfRangeError
+from kafka.context import OffsetCommitContext
+
+
+class TestOffsetCommitContext(unittest.TestCase):
+    """
+    OffsetCommitContext tests.
+    """
+
+    def setUp(self):
+        self.client = MagicMock()
+        self.consumer = MagicMock()
+        self.topic = "topic"
+        self.group = "group"
+        self.partition = 0
+        self.consumer.topic = self.topic
+        self.consumer.group = self.group
+        self.consumer.client = self.client
+        self.consumer.offsets = {self.partition: 0}
+        self.context = OffsetCommitContext(self.consumer)
+
+    def test_noop(self):
+        """
+        Should revert consumer after context exit with no mark() call.
+        """
+        with self.context:
+            # advance offset
+            self.consumer.offsets = {self.partition: 1}
+
+        # offset restored
+        self.assertEqual(self.consumer.offsets, {self.partition: 0})
+        # and seek called with relative zero delta
+        self.assertEqual(self.consumer.seek.call_count, 1)
+        self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+    def test_mark(self):
+        """
+        Should remain at marked location ater context exit.
+        """
+        with self.context as context:
+            context.mark(self.partition, 0)
+            # advance offset
+            self.consumer.offsets = {self.partition: 1}
+
+        # offset sent to client
+        self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
+
+        # offset remains advanced
+        self.assertEqual(self.consumer.offsets, {self.partition: 1})
+
+        # and seek called with relative zero delta
+        self.assertEqual(self.consumer.seek.call_count, 1)
+        self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+    def test_mark_multiple(self):
+        """
+        Should remain at highest marked location after context exit.
+        """
+        with self.context as context:
+            context.mark(self.partition, 0)
+            context.mark(self.partition, 1)
+            context.mark(self.partition, 2)
+            # advance offset
+            self.consumer.offsets = {self.partition: 3}
+
+        # offset sent to client
+        self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
+
+        # offset remains advanced
+        self.assertEqual(self.consumer.offsets, {self.partition: 3})
+
+        # and seek called with relative zero delta
+        self.assertEqual(self.consumer.seek.call_count, 1)
+        self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+    def test_rollback(self):
+        """
+        Should rollback to initial offsets on context exit with exception.
+        """
+        with self.assertRaises(Exception):
+            with self.context as context:
+                context.mark(self.partition, 0)
+                # advance offset
+                self.consumer.offsets = {self.partition: 1}
+
+                raise Exception("Intentional failure")
+
+        # offset rolled back (ignoring mark)
+        self.assertEqual(self.consumer.offsets, {self.partition: 0})
+
+        # and seek called with relative zero delta
+        self.assertEqual(self.consumer.seek.call_count, 1)
+        self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
+
+    def test_out_of_range(self):
+        """
+        Should reset to beginning of valid offsets on `OffsetOutOfRangeError`
+        """
+        def _seek(offset, whence):
+            # seek must be called with 0, 0 to find the beginning of the range
+            self.assertEqual(offset, 0)
+            self.assertEqual(whence, 0)
+            # set offsets to something different
+            self.consumer.offsets = {self.partition: 100}
+
+        with patch.object(self.consumer, "seek", _seek):
+            with self.context:
+                raise OffsetOutOfRangeError()
+
+        self.assertEqual(self.consumer.offsets, {self.partition: 100})

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_failover_integration.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_failover_integration.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_failover_integration.py
new file mode 100644
index 0000000..7d27526
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_failover_integration.py
@@ -0,0 +1,201 @@
+import logging
+import os
+import time
+
+from . import unittest
+
+from kafka import KafkaClient, SimpleConsumer
+from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
+from kafka.producer.base import Producer
+from kafka.producer import KeyedProducer
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import (
+    KafkaIntegrationTestCase, kafka_versions, random_string
+)
+
+
+class TestFailover(KafkaIntegrationTestCase):
+    create_client = False
+
+    def setUp(self):
+        if not os.environ.get('KAFKA_VERSION'):
+            return
+
+        zk_chroot = random_string(10)
+        replicas = 2
+        partitions = 2
+
+        # mini zookeeper, 2 kafka brokers
+        self.zk = ZookeeperFixture.instance()
+        kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
+        self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
+
+        hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
+        self.client = KafkaClient(hosts)
+        super(TestFailover, self).setUp()
+
+    def tearDown(self):
+        super(TestFailover, self).tearDown()
+        if not os.environ.get('KAFKA_VERSION'):
+            return
+
+        self.client.close()
+        for broker in self.brokers:
+            broker.close()
+        self.zk.close()
+
+    @kafka_versions("all")
+    def test_switch_leader(self):
+        topic = self.topic
+        partition = 0
+
+        # Testing the base Producer class here so that we can easily send
+        # messages to a specific partition, kill the leader for that partition
+        # and check that after another broker takes leadership the producer
+        # is able to resume sending messages
+
+        # require that the server commit messages to all in-sync replicas
+        # so that failover doesn't lose any messages on server-side
+        # and we can assert that server-side message count equals client-side
+        producer = Producer(self.client, async=False,
+                            req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
+
+        # Send 100 random messages to a specific partition
+        self._send_random_messages(producer, topic, partition, 100)
+
+        # kill leader for partition
+        self._kill_leader(topic, partition)
+
+        # expect failure, but dont wait more than 60 secs to recover
+        recovered = False
+        started = time.time()
+        timeout = 60
+        while not recovered and (time.time() - started) < timeout:
+            try:
+                logging.debug("attempting to send 'success' message after leader killed")
+                producer.send_messages(topic, partition, b'success')
+                logging.debug("success!")
+                recovered = True
+            except (FailedPayloadsError, ConnectionError):
+                logging.debug("caught exception sending message -- will retry")
+                continue
+
+        # Verify we successfully sent the message
+        self.assertTrue(recovered)
+
+        # send some more messages to new leader
+        self._send_random_messages(producer, topic, partition, 100)
+
+        # count number of messages
+        # Should be equal to 100 before + 1 recovery + 100 after
+        self.assert_message_count(topic, 201, partitions=(partition,))
+
+
+    #@kafka_versions("all")
+    @unittest.skip("async producer does not support reliable failover yet")
+    def test_switch_leader_async(self):
+        topic = self.topic
+        partition = 0
+
+        # Test the base class Producer -- send_messages to a specific partition
+        producer = Producer(self.client, async=True)
+
+        # Send 10 random messages
+        self._send_random_messages(producer, topic, partition, 10)
+
+        # kill leader for partition
+        self._kill_leader(topic, partition)
+
+        logging.debug("attempting to send 'success' message after leader killed")
+
+        # in async mode, this should return immediately
+        producer.send_messages(topic, partition, 'success')
+
+        # send to new leader
+        self._send_random_messages(producer, topic, partition, 10)
+
+        # wait until producer queue is empty
+        while not producer.queue.empty():
+            time.sleep(0.1)
+        producer.stop()
+
+        # count number of messages
+        # Should be equal to 10 before + 1 recovery + 10 after
+        self.assert_message_count(topic, 21, partitions=(partition,))
+
+    @kafka_versions("all")
+    def test_switch_leader_keyed_producer(self):
+        topic = self.topic
+
+        producer = KeyedProducer(self.client, async=False)
+
+        # Send 10 random messages
+        for _ in range(10):
+            key = random_string(3)
+            msg = random_string(10)
+            producer.send_messages(topic, key, msg)
+
+        # kill leader for partition 0
+        self._kill_leader(topic, 0)
+
+        recovered = False
+        started = time.time()
+        timeout = 60
+        while not recovered and (time.time() - started) < timeout:
+            try:
+                key = random_string(3)
+                msg = random_string(10)
+                producer.send_messages(topic, key, msg)
+                if producer.partitioners[topic].partition(key) == 0:
+                    recovered = True
+            except (FailedPayloadsError, ConnectionError):
+                logging.debug("caught exception sending message -- will retry")
+                continue
+
+        # Verify we successfully sent the message
+        self.assertTrue(recovered)
+
+        # send some more messages just to make sure no more exceptions
+        for _ in range(10):
+            key = random_string(3)
+            msg = random_string(10)
+            producer.send_messages(topic, key, msg)
+
+
+    def _send_random_messages(self, producer, topic, partition, n):
+        for j in range(n):
+            logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
+            resp = producer.send_messages(topic, partition, random_string(10))
+            if len(resp) > 0:
+                self.assertEqual(resp[0].error, 0)
+            logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
+
+    def _kill_leader(self, topic, partition):
+        leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
+        broker = self.brokers[leader.nodeId]
+        broker.close()
+        return broker
+
+    def assert_message_count(self, topic, check_count, timeout=10, partitions=None):
+        hosts = ','.join(['%s:%d' % (broker.host, broker.port)
+                          for broker in self.brokers])
+
+        client = KafkaClient(hosts)
+        group = random_string(10)
+        consumer = SimpleConsumer(client, group, topic,
+                                  partitions=partitions,
+                                  auto_commit=False,
+                                  iter_timeout=timeout)
+
+        started_at = time.time()
+        pending = consumer.pending(partitions)
+
+        # Keep checking if it isn't immediately correct, subject to timeout
+        while pending != check_count and (time.time() - started_at < timeout):
+            pending = consumer.pending(partitions)
+
+        consumer.stop()
+        client.close()
+
+        self.assertEqual(pending, check_count)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_package.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_package.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_package.py
new file mode 100644
index 0000000..e91753c
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_package.py
@@ -0,0 +1,29 @@
+from . import unittest
+
+class TestPackage(unittest.TestCase):
+    def test_top_level_namespace(self):
+        import kafka as kafka1
+        self.assertEqual(kafka1.KafkaClient.__name__, "KafkaClient")
+        self.assertEqual(kafka1.client.__name__, "kafka.client")
+        self.assertEqual(kafka1.codec.__name__, "kafka.codec")
+
+    def test_submodule_namespace(self):
+        import kafka.client as client1
+        self.assertEqual(client1.__name__, "kafka.client")
+        self.assertEqual(client1.KafkaClient.__name__, "KafkaClient")
+
+        from kafka import client as client2
+        self.assertEqual(client2.__name__, "kafka.client")
+        self.assertEqual(client2.KafkaClient.__name__, "KafkaClient")
+
+        from kafka.client import KafkaClient as KafkaClient1
+        self.assertEqual(KafkaClient1.__name__, "KafkaClient")
+
+        from kafka.codec import gzip_encode as gzip_encode1
+        self.assertEqual(gzip_encode1.__name__, "gzip_encode")
+
+        from kafka import KafkaClient as KafkaClient2
+        self.assertEqual(KafkaClient2.__name__, "KafkaClient")
+
+        from kafka.codec import snappy_encode
+        self.assertEqual(snappy_encode.__name__, "snappy_encode")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_producer.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_producer.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_producer.py
new file mode 100644
index 0000000..f6b3d6a
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_producer.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+
+import logging
+
+from mock import MagicMock
+from . import unittest
+
+from kafka.producer.base import Producer
+
+
+class TestKafkaProducer(unittest.TestCase):
+    def test_producer_message_types(self):
+
+        producer = Producer(MagicMock())
+        topic = b"test-topic"
+        partition = 0
+
+        bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'})
+        for m in bad_data_types:
+            with self.assertRaises(TypeError):
+                logging.debug("attempting to send message of type %s", type(m))
+                producer.send_messages(topic, partition, m)
+
+        good_data_types = (b'a string!',)
+        for m in good_data_types:
+            # This should not raise an exception
+            producer.send_messages(topic, partition, m)
+
+    def test_topic_message_types(self):
+        from kafka.producer.simple import SimpleProducer
+
+        client = MagicMock()
+
+        def partitions(topic):
+            return [0, 1]
+
+        client.get_partition_ids_for_topic = partitions
+
+        producer = SimpleProducer(client, random_start=False)
+        topic = b"test-topic"
+        producer.send_messages(topic, b'hi')
+        assert client.send_produce_request.called

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_producer_integration.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_producer_integration.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_producer_integration.py
new file mode 100644
index 0000000..38df69f
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_producer_integration.py
@@ -0,0 +1,481 @@
+import os
+import time
+import uuid
+
+from six.moves import range
+
+from kafka import (
+    SimpleProducer, KeyedProducer,
+    create_message, create_gzip_message, create_snappy_message,
+    RoundRobinPartitioner, HashedPartitioner
+)
+from kafka.codec import has_snappy
+from kafka.common import (
+    FetchRequest, ProduceRequest,
+    UnknownTopicOrPartitionError, LeaderNotAvailableError
+)
+from kafka.producer.base import Producer
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import KafkaIntegrationTestCase, kafka_versions
+
+class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
+
+    @classmethod
+    def setUpClass(cls):  # noqa
+        if not os.environ.get('KAFKA_VERSION'):
+            return
+
+        cls.zk = ZookeeperFixture.instance()
+        cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
+
+    @classmethod
+    def tearDownClass(cls):  # noqa
+        if not os.environ.get('KAFKA_VERSION'):
+            return
+
+        cls.server.close()
+        cls.zk.close()
+
+    @kafka_versions("all")
+    def test_produce_many_simple(self):
+        start_offset = self.current_offset(self.topic, 0)
+
+        self.assert_produce_request(
+            [create_message(("Test message %d" % i).encode('utf-8'))
+             for i in range(100)],
+            start_offset,
+            100,
+        )
+
+        self.assert_produce_request(
+            [create_message(("Test message %d" % i).encode('utf-8'))
+             for i in range(100)],
+            start_offset+100,
+            100,
+        )
+
+    @kafka_versions("all")
+    def test_produce_10k_simple(self):
+        start_offset = self.current_offset(self.topic, 0)
+
+        self.assert_produce_request(
+            [create_message(("Test message %d" % i).encode('utf-8'))
+             for i in range(10000)],
+            start_offset,
+            10000,
+        )
+
+    @kafka_versions("all")
+    def test_produce_many_gzip(self):
+        start_offset = self.current_offset(self.topic, 0)
+
+        message1 = create_gzip_message([
+            ("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)])
+        message2 = create_gzip_message([
+            ("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)])
+
+        self.assert_produce_request(
+            [ message1, message2 ],
+            start_offset,
+            200,
+        )
+
+    @kafka_versions("all")
+    def test_produce_many_snappy(self):
+        self.skipTest("All snappy integration tests fail with nosnappyjava")
+        start_offset = self.current_offset(self.topic, 0)
+
+        self.assert_produce_request([
+                create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
+                create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
+            ],
+            start_offset,
+            200,
+        )
+
+    @kafka_versions("all")
+    def test_produce_mixed(self):
+        start_offset = self.current_offset(self.topic, 0)
+
+        msg_count = 1+100
+        messages = [
+            create_message(b"Just a plain message"),
+            create_gzip_message([
+                ("Gzipped %d" % i).encode('utf-8') for i in range(100)]),
+        ]
+
+        # All snappy integration tests fail with nosnappyjava
+        if False and has_snappy():
+            msg_count += 100
+            messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)]))
+
+        self.assert_produce_request(messages, start_offset, msg_count)
+
+    @kafka_versions("all")
+    def test_produce_100k_gzipped(self):
+        start_offset = self.current_offset(self.topic, 0)
+
+        self.assert_produce_request([
+            create_gzip_message([
+                ("Gzipped batch 1, message %d" % i).encode('utf-8')
+                for i in range(50000)])
+            ],
+            start_offset,
+            50000,
+        )
+
+        self.assert_produce_request([
+            create_gzip_message([
+                ("Gzipped batch 1, message %d" % i).encode('utf-8')
+                for i in range(50000)])
+            ],
+            start_offset+50000,
+            50000,
+        )
+
+    ############################
+    #   SimpleProducer Tests   #
+    ############################
+
+    @kafka_versions("all")
+    def test_simple_producer(self):
+        partitions = self.client.get_partition_ids_for_topic(self.topic)
+        start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+
+        producer = SimpleProducer(self.client, random_start=False)
+
+        # Goes to first partition, randomly.
+        resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
+        self.assert_produce_response(resp, start_offsets[0])
+
+        # Goes to the next partition, randomly.
+        resp = producer.send_messages(self.topic, self.msg("three"))
+        self.assert_produce_response(resp, start_offsets[1])
+
+        self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two") ])
+        self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("three") ])
+
+        # Goes back to the first partition because there's only two partitions
+        resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
+        self.assert_produce_response(resp, start_offsets[0]+2)
+        self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
+
+        producer.stop()
+
+    @kafka_versions("all")
+    def test_produce__new_topic_fails_with_reasonable_error(self):
+        new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
+        producer = SimpleProducer(self.client, random_start=False)
+
+        # At first it doesn't exist
+        with self.assertRaises((UnknownTopicOrPartitionError,
+                                LeaderNotAvailableError)):
+            producer.send_messages(new_topic, self.msg("one"))
+
+    @kafka_versions("all")
+    def test_producer_random_order(self):
+        producer = SimpleProducer(self.client, random_start=True)
+        resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
+        resp2 = producer.send_messages(self.topic, self.msg("three"))
+        resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
+
+        self.assertEqual(resp1[0].partition, resp3[0].partition)
+        self.assertNotEqual(resp1[0].partition, resp2[0].partition)
+
+    @kafka_versions("all")
+    def test_producer_ordered_start(self):
+        producer = SimpleProducer(self.client, random_start=False)
+        resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
+        resp2 = producer.send_messages(self.topic, self.msg("three"))
+        resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
+
+        self.assertEqual(resp1[0].partition, 0)
+        self.assertEqual(resp2[0].partition, 1)
+        self.assertEqual(resp3[0].partition, 0)
+
+    @kafka_versions("all")
+    def test_async_simple_producer(self):
+        partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+        start_offset = self.current_offset(self.topic, partition)
+
+        producer = SimpleProducer(self.client, async=True, random_start=False)
+        resp = producer.send_messages(self.topic, self.msg("one"))
+        self.assertEqual(len(resp), 0)
+
+        # wait for the server to report a new highwatermark
+        while self.current_offset(self.topic, partition) == start_offset:
+          time.sleep(0.1)
+
+        self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
+
+        producer.stop()
+
+    @kafka_versions("all")
+    def test_batched_simple_producer__triggers_by_message(self):
+        partitions = self.client.get_partition_ids_for_topic(self.topic)
+        start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+
+        # Configure batch producer
+        batch_messages = 5
+        batch_interval = 5
+        producer = SimpleProducer(
+            self.client,
+            batch_send=True,
+            batch_send_every_n=batch_messages,
+            batch_send_every_t=batch_interval,
+            random_start=False)
+
+        # Send 4 messages -- should not trigger a batch
+        resp = producer.send_messages(self.topic,
+            self.msg("one"),
+            self.msg("two"),
+            self.msg("three"),
+            self.msg("four"),
+        )
+
+        # Batch mode is async. No ack
+        self.assertEqual(len(resp), 0)
+
+        # It hasn't sent yet
+        self.assert_fetch_offset(partitions[0], start_offsets[0], [])
+        self.assert_fetch_offset(partitions[1], start_offsets[1], [])
+
+        # send 3 more messages -- should trigger batch on first 5
+        resp = producer.send_messages(self.topic,
+            self.msg("five"),
+            self.msg("six"),
+            self.msg("seven"),
+        )
+
+        # Batch mode is async. No ack
+        self.assertEqual(len(resp), 0)
+
+        # send messages groups all *msgs in a single call to the same partition
+        # so we should see all messages from the first call in one partition
+        self.assert_fetch_offset(partitions[0], start_offsets[0], [
+            self.msg("one"),
+            self.msg("two"),
+            self.msg("three"),
+            self.msg("four"),
+        ])
+
+        # Because we are batching every 5 messages, we should only see one
+        self.assert_fetch_offset(partitions[1], start_offsets[1], [
+            self.msg("five"),
+        ])
+
+        producer.stop()
+
+    @kafka_versions("all")
+    def test_batched_simple_producer__triggers_by_time(self):
+        partitions = self.client.get_partition_ids_for_topic(self.topic)
+        start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+
+        batch_interval = 5
+        producer = SimpleProducer(self.client,
+            batch_send=True,
+            batch_send_every_n=100,
+            batch_send_every_t=batch_interval,
+            random_start=False)
+
+        # Send 5 messages and do a fetch
+        resp = producer.send_messages(self.topic,
+            self.msg("one"),
+            self.msg("two"),
+            self.msg("three"),
+            self.msg("four"),
+        )
+
+        # Batch mode is async. No ack
+        self.assertEqual(len(resp), 0)
+
+        # It hasn't sent yet
+        self.assert_fetch_offset(partitions[0], start_offsets[0], [])
+        self.assert_fetch_offset(partitions[1], start_offsets[1], [])
+
+        resp = producer.send_messages(self.topic,
+            self.msg("five"),
+            self.msg("six"),
+            self.msg("seven"),
+        )
+
+        # Batch mode is async. No ack
+        self.assertEqual(len(resp), 0)
+
+        # Wait the timeout out
+        time.sleep(batch_interval)
+
+        self.assert_fetch_offset(partitions[0], start_offsets[0], [
+            self.msg("one"),
+            self.msg("two"),
+            self.msg("three"),
+            self.msg("four"),
+        ])
+
+        self.assert_fetch_offset(partitions[1], start_offsets[1], [
+            self.msg("five"),
+            self.msg("six"),
+            self.msg("seven"),
+        ])
+
+        producer.stop()
+
+
+    ############################
+    #   KeyedProducer Tests    #
+    ############################
+
+    @kafka_versions("all")
+    def test_round_robin_partitioner(self):
+        partitions = self.client.get_partition_ids_for_topic(self.topic)
+        start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+
+        producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
+        resp1 = producer.send(self.topic, self.key("key1"), self.msg("one"))
+        resp2 = producer.send(self.topic, self.key("key2"), self.msg("two"))
+        resp3 = producer.send(self.topic, self.key("key3"), self.msg("three"))
+        resp4 = producer.send(self.topic, self.key("key4"), self.msg("four"))
+
+        self.assert_produce_response(resp1, start_offsets[0]+0)
+        self.assert_produce_response(resp2, start_offsets[1]+0)
+        self.assert_produce_response(resp3, start_offsets[0]+1)
+        self.assert_produce_response(resp4, start_offsets[1]+1)
+
+        self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("three") ])
+        self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("two"), self.msg("four")  ])
+
+        producer.stop()
+
+    @kafka_versions("all")
+    def test_hashed_partitioner(self):
+        partitions = self.client.get_partition_ids_for_topic(self.topic)
+        start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+
+        producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
+        resp1 = producer.send(self.topic, self.key("1"), self.msg("one"))
+        resp2 = producer.send(self.topic, self.key("2"), self.msg("two"))
+        resp3 = producer.send(self.topic, self.key("3"), self.msg("three"))
+        resp4 = producer.send(self.topic, self.key("3"), self.msg("four"))
+        resp5 = producer.send(self.topic, self.key("4"), self.msg("five"))
+
+        offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]}
+        messages = {partitions[0]: [], partitions[1]: []}
+
+        keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]]
+        resps = [resp1, resp2, resp3, resp4, resp5]
+        msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]]
+
+        for key, resp, msg in zip(keys, resps, msgs):
+            k = hash(key) % 2
+            partition = partitions[k]
+            offset = offsets[partition]
+            self.assert_produce_response(resp, offset)
+            offsets[partition] += 1
+            messages[partition].append(msg)
+
+        self.assert_fetch_offset(partitions[0], start_offsets[0], messages[partitions[0]])
+        self.assert_fetch_offset(partitions[1], start_offsets[1], messages[partitions[1]])
+
+        producer.stop()
+
+    @kafka_versions("all")
+    def test_async_keyed_producer(self):
+        partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+        start_offset = self.current_offset(self.topic, partition)
+
+        producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
+
+        resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
+        self.assertEqual(len(resp), 0)
+
+        # wait for the server to report a new highwatermark
+        while self.current_offset(self.topic, partition) == start_offset:
+          time.sleep(0.1)
+
+        self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
+
+        producer.stop()
+
+    ############################
+    #   Producer ACK Tests     #
+    ############################
+
+    @kafka_versions("all")
+    def test_acks_none(self):
+        partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+        start_offset = self.current_offset(self.topic, partition)
+
+        producer = Producer(
+            self.client,
+            req_acks=Producer.ACK_NOT_REQUIRED,
+        )
+        resp = producer.send_messages(self.topic, partition, self.msg("one"))
+
+        # No response from produce request with no acks required
+        self.assertEqual(len(resp), 0)
+
+        # But the message should still have been delivered
+        self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
+        producer.stop()
+
+    @kafka_versions("all")
+    def test_acks_local_write(self):
+        partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+        start_offset = self.current_offset(self.topic, partition)
+
+        producer = Producer(
+            self.client,
+            req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
+        )
+        resp = producer.send_messages(self.topic, partition, self.msg("one"))
+
+        self.assert_produce_response(resp, start_offset)
+        self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
+
+        producer.stop()
+
+    @kafka_versions("all")
+    def test_acks_cluster_commit(self):
+        partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+        start_offset = self.current_offset(self.topic, partition)
+
+        producer = Producer(
+            self.client,
+            req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
+        )
+
+        resp = producer.send_messages(self.topic, partition, self.msg("one"))
+        self.assert_produce_response(resp, start_offset)
+        self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
+
+        producer.stop()
+
+    def assert_produce_request(self, messages, initial_offset, message_ct,
+                               partition=0):
+        produce = ProduceRequest(self.topic, partition, messages=messages)
+
+        # There should only be one response message from the server.
+        # This will throw an exception if there's more than one.
+        resp = self.client.send_produce_request([ produce ])
+        self.assert_produce_response(resp, initial_offset)
+
+        self.assertEqual(self.current_offset(self.topic, partition), initial_offset + message_ct)
+
+    def assert_produce_response(self, resp, initial_offset):
+        self.assertEqual(len(resp), 1)
+        self.assertEqual(resp[0].error, 0)
+        self.assertEqual(resp[0].offset, initial_offset)
+
+    def assert_fetch_offset(self, partition, start_offset, expected_messages):
+        # There should only be one response message from the server.
+        # This will throw an exception if there's more than one.
+
+        resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ])
+
+        self.assertEqual(resp.error, 0)
+        self.assertEqual(resp.partition, partition)
+        messages = [ x.message.value for x in resp.messages ]
+
+        self.assertEqual(messages, expected_messages)
+        self.assertEqual(resp.highwaterMark, start_offset+len(expected_messages))

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_protocol.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_protocol.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_protocol.py
new file mode 100644
index 0000000..d20f591
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_protocol.py
@@ -0,0 +1,732 @@
+from contextlib import contextmanager
+import struct
+
+import six
+from mock import patch, sentinel
+from . import unittest
+
+from kafka.codec import has_snappy, gzip_decode, snappy_decode
+from kafka.common import (
+    OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
+    OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
+    ProduceRequest, FetchRequest, Message, ChecksumError,
+    ProduceResponse, FetchResponse, OffsetAndMessage,
+    BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
+    KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
+    ProtocolError
+)
+from kafka.protocol import (
+    ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
+    create_message, create_gzip_message, create_snappy_message,
+    create_message_set
+)
+
+class TestProtocol(unittest.TestCase):
+    def test_create_message(self):
+        payload = "test"
+        key = "key"
+        msg = create_message(payload, key)
+        self.assertEqual(msg.magic, 0)
+        self.assertEqual(msg.attributes, 0)
+        self.assertEqual(msg.key, key)
+        self.assertEqual(msg.value, payload)
+
+    def test_create_gzip(self):
+        payloads = [b"v1", b"v2"]
+        msg = create_gzip_message(payloads)
+        self.assertEqual(msg.magic, 0)
+        self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
+        self.assertEqual(msg.key, None)
+        # Need to decode to check since gzipped payload is non-deterministic
+        decoded = gzip_decode(msg.value)
+        expect = b"".join([
+            struct.pack(">q", 0),          # MsgSet offset
+            struct.pack(">i", 16),         # MsgSet size
+            struct.pack(">i", 1285512130), # CRC
+            struct.pack(">bb", 0, 0),      # Magic, flags
+            struct.pack(">i", -1),         # -1 indicates a null key
+            struct.pack(">i", 2),          # Msg length (bytes)
+            b"v1",                         # Message contents
+
+            struct.pack(">q", 0),          # MsgSet offset
+            struct.pack(">i", 16),         # MsgSet size
+            struct.pack(">i", -711587208), # CRC
+            struct.pack(">bb", 0, 0),      # Magic, flags
+            struct.pack(">i", -1),         # -1 indicates a null key
+            struct.pack(">i", 2),          # Msg length (bytes)
+            b"v2",                         # Message contents
+        ])
+
+        self.assertEqual(decoded, expect)
+
+    @unittest.skipUnless(has_snappy(), "Snappy not available")
+    def test_create_snappy(self):
+        payloads = [b"v1", b"v2"]
+        msg = create_snappy_message(payloads)
+        self.assertEqual(msg.magic, 0)
+        self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
+        self.assertEqual(msg.key, None)
+        decoded = snappy_decode(msg.value)
+        expect = b"".join([
+            struct.pack(">q", 0),          # MsgSet offset
+            struct.pack(">i", 16),         # MsgSet size
+            struct.pack(">i", 1285512130), # CRC
+            struct.pack(">bb", 0, 0),      # Magic, flags
+            struct.pack(">i", -1),         # -1 indicates a null key
+            struct.pack(">i", 2),          # Msg length (bytes)
+            b"v1",                         # Message contents
+
+            struct.pack(">q", 0),          # MsgSet offset
+            struct.pack(">i", 16),         # MsgSet size
+            struct.pack(">i", -711587208), # CRC
+            struct.pack(">bb", 0, 0),      # Magic, flags
+            struct.pack(">i", -1),         # -1 indicates a null key
+            struct.pack(">i", 2),          # Msg length (bytes)
+            b"v2",                         # Message contents
+        ])
+
+        self.assertEqual(decoded, expect)
+
+    def test_encode_message_header(self):
+        expect = b"".join([
+            struct.pack(">h", 10),             # API Key
+            struct.pack(">h", 0),              # API Version
+            struct.pack(">i", 4),              # Correlation Id
+            struct.pack(">h", len("client3")), # Length of clientId
+            b"client3",                         # ClientId
+        ])
+
+        encoded = KafkaProtocol._encode_message_header(b"client3", 4, 10)
+        self.assertEqual(encoded, expect)
+
+    def test_encode_message(self):
+        message = create_message(b"test", b"key")
+        encoded = KafkaProtocol._encode_message(message)
+        expect = b"".join([
+            struct.pack(">i", -1427009701), # CRC
+            struct.pack(">bb", 0, 0),       # Magic, flags
+            struct.pack(">i", 3),           # Length of key
+            b"key",                          # key
+            struct.pack(">i", 4),           # Length of value
+            b"test",                         # value
+        ])
+
+        self.assertEqual(encoded, expect)
+
+    def test_decode_message(self):
+        encoded = b"".join([
+            struct.pack(">i", -1427009701), # CRC
+            struct.pack(">bb", 0, 0),       # Magic, flags
+            struct.pack(">i", 3),           # Length of key
+            b"key",                         # key
+            struct.pack(">i", 4),           # Length of value
+            b"test",                        # value
+        ])
+
+        offset = 10
+        (returned_offset, decoded_message) = list(KafkaProtocol._decode_message(encoded, offset))[0]
+
+        self.assertEqual(returned_offset, offset)
+        self.assertEqual(decoded_message, create_message(b"test", b"key"))
+
+    def test_encode_message_failure(self):
+        with self.assertRaises(ProtocolError):
+            KafkaProtocol._encode_message(Message(1, 0, "key", "test"))
+
+    def test_encode_message_set(self):
+        message_set = [
+            create_message(b"v1", b"k1"),
+            create_message(b"v2", b"k2")
+        ]
+
+        encoded = KafkaProtocol._encode_message_set(message_set)
+        expect = b"".join([
+            struct.pack(">q", 0),          # MsgSet Offset
+            struct.pack(">i", 18),         # Msg Size
+            struct.pack(">i", 1474775406), # CRC
+            struct.pack(">bb", 0, 0),      # Magic, flags
+            struct.pack(">i", 2),          # Length of key
+            b"k1",                          # Key
+            struct.pack(">i", 2),          # Length of value
+            b"v1",                          # Value
+
+            struct.pack(">q", 0),          # MsgSet Offset
+            struct.pack(">i", 18),         # Msg Size
+            struct.pack(">i", -16383415),  # CRC
+            struct.pack(">bb", 0, 0),      # Magic, flags
+            struct.pack(">i", 2),          # Length of key
+            b"k2",                          # Key
+            struct.pack(">i", 2),          # Length of value
+            b"v2",                          # Value
+        ])
+
+        self.assertEqual(encoded, expect)
+
+    def test_decode_message_set(self):
+        encoded = b"".join([
+            struct.pack(">q", 0),          # MsgSet Offset
+            struct.pack(">i", 18),         # Msg Size
+            struct.pack(">i", 1474775406), # CRC
+            struct.pack(">bb", 0, 0),      # Magic, flags
+            struct.pack(">i", 2),          # Length of key
+            b"k1",                         # Key
+            struct.pack(">i", 2),          # Length of value
+            b"v1",                         # Value
+
+            struct.pack(">q", 1),          # MsgSet Offset
+            struct.pack(">i", 18),         # Msg Size
+            struct.pack(">i", -16383415),  # CRC
+            struct.pack(">bb", 0, 0),      # Magic, flags
+            struct.pack(">i", 2),          # Length of key
+            b"k2",                         # Key
+            struct.pack(">i", 2),          # Length of value
+            b"v2",                         # Value
+        ])
+
+        msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
+        self.assertEqual(len(msgs), 2)
+        msg1, msg2 = msgs
+
+        returned_offset1, decoded_message1 = msg1
+        returned_offset2, decoded_message2 = msg2
+
+        self.assertEqual(returned_offset1, 0)
+        self.assertEqual(decoded_message1, create_message(b"v1", b"k1"))
+
+        self.assertEqual(returned_offset2, 1)
+        self.assertEqual(decoded_message2, create_message(b"v2", b"k2"))
+
+    def test_decode_message_gzip(self):
+        gzip_encoded = (b'\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
+                        b'\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
+                        b'\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8'
+                        b'\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00'
+                        b'\x00')
+        offset = 11
+        messages = list(KafkaProtocol._decode_message(gzip_encoded, offset))
+
+        self.assertEqual(len(messages), 2)
+        msg1, msg2 = messages
+
+        returned_offset1, decoded_message1 = msg1
+        self.assertEqual(returned_offset1, 0)
+        self.assertEqual(decoded_message1, create_message(b"v1"))
+
+        returned_offset2, decoded_message2 = msg2
+        self.assertEqual(returned_offset2, 0)
+        self.assertEqual(decoded_message2, create_message(b"v2"))
+
+    @unittest.skipUnless(has_snappy(), "Snappy not available")
+    def test_decode_message_snappy(self):
+        snappy_encoded = (b'\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00'
+                          b'\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff'
+                          b'\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5'
+                          b'\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2')
+        offset = 11
+        messages = list(KafkaProtocol._decode_message(snappy_encoded, offset))
+        self.assertEqual(len(messages), 2)
+
+        msg1, msg2 = messages
+
+        returned_offset1, decoded_message1 = msg1
+        self.assertEqual(returned_offset1, 0)
+        self.assertEqual(decoded_message1, create_message(b"v1"))
+
+        returned_offset2, decoded_message2 = msg2
+        self.assertEqual(returned_offset2, 0)
+        self.assertEqual(decoded_message2, create_message(b"v2"))
+
+    def test_decode_message_checksum_error(self):
+        invalid_encoded_message = b"This is not a valid encoded message"
+        iter = KafkaProtocol._decode_message(invalid_encoded_message, 0)
+        self.assertRaises(ChecksumError, list, iter)
+
+    # NOTE: The error handling in _decode_message_set_iter() is questionable.
+    # If it's modified, the next two tests might need to be fixed.
+    def test_decode_message_set_fetch_size_too_small(self):
+        with self.assertRaises(ConsumerFetchSizeTooSmall):
+            list(KafkaProtocol._decode_message_set_iter('a'))
+
+    def test_decode_message_set_stop_iteration(self):
+        encoded = b"".join([
+            struct.pack(">q", 0),          # MsgSet Offset
+            struct.pack(">i", 18),         # Msg Size
+            struct.pack(">i", 1474775406), # CRC
+            struct.pack(">bb", 0, 0),      # Magic, flags
+            struct.pack(">i", 2),          # Length of key
+            b"k1",                         # Key
+            struct.pack(">i", 2),          # Length of value
+            b"v1",                         # Value
+
+            struct.pack(">q", 1),          # MsgSet Offset
+            struct.pack(">i", 18),         # Msg Size
+            struct.pack(">i", -16383415),  # CRC
+            struct.pack(">bb", 0, 0),      # Magic, flags
+            struct.pack(">i", 2),          # Length of key
+            b"k2",                         # Key
+            struct.pack(">i", 2),          # Length of value
+            b"v2",                         # Value
+            b"@1$%(Y!",                    # Random padding
+        ])
+
+        msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
+        self.assertEqual(len(msgs), 2)
+        msg1, msg2 = msgs
+
+        returned_offset1, decoded_message1 = msg1
+        returned_offset2, decoded_message2 = msg2
+
+        self.assertEqual(returned_offset1, 0)
+        self.assertEqual(decoded_message1, create_message(b"v1", b"k1"))
+
+        self.assertEqual(returned_offset2, 1)
+        self.assertEqual(decoded_message2, create_message(b"v2", b"k2"))
+
+    def test_encode_produce_request(self):
+        requests = [
+            ProduceRequest(b"topic1", 0, [
+                create_message(b"a"),
+                create_message(b"b")
+            ]),
+            ProduceRequest(b"topic2", 1, [
+                create_message(b"c")
+            ])
+        ]
+
+        msg_a_binary = KafkaProtocol._encode_message(create_message(b"a"))
+        msg_b_binary = KafkaProtocol._encode_message(create_message(b"b"))
+        msg_c_binary = KafkaProtocol._encode_message(create_message(b"c"))
+
+        header = b"".join([
+            struct.pack('>i', 0x94),                   # The length of the message overall
+            struct.pack('>h', 0),                      # Msg Header, Message type = Produce
+            struct.pack('>h', 0),                      # Msg Header, API version
+            struct.pack('>i', 2),                      # Msg Header, Correlation ID
+            struct.pack('>h7s', 7, b"client1"),        # Msg Header, The client ID
+            struct.pack('>h', 2),                      # Num acks required
+            struct.pack('>i', 100),                    # Request Timeout
+            struct.pack('>i', 2),                      # The number of requests
+        ])
+
+        total_len = len(msg_a_binary) + len(msg_b_binary)
+        topic1 = b"".join([
+            struct.pack('>h6s', 6, b'topic1'),         # The topic1
+            struct.pack('>i', 1),                      # One message set
+            struct.pack('>i', 0),                      # Partition 0
+            struct.pack('>i', total_len + 24),         # Size of the incoming message set
+            struct.pack('>q', 0),                      # No offset specified
+            struct.pack('>i', len(msg_a_binary)),      # Length of message
+            msg_a_binary,                              # Actual message
+            struct.pack('>q', 0),                      # No offset specified
+            struct.pack('>i', len(msg_b_binary)),      # Length of message
+            msg_b_binary,                              # Actual message
+        ])
+
+        topic2 = b"".join([
+            struct.pack('>h6s', 6, b'topic2'),         # The topic1
+            struct.pack('>i', 1),                      # One message set
+            struct.pack('>i', 1),                      # Partition 1
+            struct.pack('>i', len(msg_c_binary) + 12), # Size of the incoming message set
+            struct.pack('>q', 0),                      # No offset specified
+            struct.pack('>i', len(msg_c_binary)),      # Length of message
+            msg_c_binary,                              # Actual message
+        ])
+
+        expected1 = b"".join([ header, topic1, topic2 ])
+        expected2 = b"".join([ header, topic2, topic1 ])
+
+        encoded = KafkaProtocol.encode_produce_request(b"client1", 2, requests, 2, 100)
+        self.assertIn(encoded, [ expected1, expected2 ])
+
+    def test_decode_produce_response(self):
+        t1 = b"topic1"
+        t2 = b"topic2"
+        _long = int
+        if six.PY2:
+            _long = long
+        encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)),
+                              2, 2, len(t1), t1, 2, 0, 0, _long(10), 1, 1, _long(20),
+                              len(t2), t2, 1, 0, 0, _long(30))
+        responses = list(KafkaProtocol.decode_produce_response(encoded))
+        self.assertEqual(responses,
+                         [ProduceResponse(t1, 0, 0, _long(10)),
+                          ProduceResponse(t1, 1, 1, _long(20)),
+                          ProduceResponse(t2, 0, 0, _long(30))])
+
+    def test_encode_fetch_request(self):
+        requests = [
+            FetchRequest(b"topic1", 0, 10, 1024),
+            FetchRequest(b"topic2", 1, 20, 100),
+        ]
+
+        header = b"".join([
+            struct.pack('>i', 89),             # The length of the message overall
+            struct.pack('>h', 1),              # Msg Header, Message type = Fetch
+            struct.pack('>h', 0),              # Msg Header, API version
+            struct.pack('>i', 3),              # Msg Header, Correlation ID
+            struct.pack('>h7s', 7, b"client1"),# Msg Header, The client ID
+            struct.pack('>i', -1),             # Replica Id
+            struct.pack('>i', 2),              # Max wait time
+            struct.pack('>i', 100),            # Min bytes
+            struct.pack('>i', 2),              # Num requests
+        ])
+
+        topic1 = b"".join([
+            struct.pack('>h6s', 6, b'topic1'),# Topic
+            struct.pack('>i', 1),             # Num Payloads
+            struct.pack('>i', 0),             # Partition 0
+            struct.pack('>q', 10),            # Offset
+            struct.pack('>i', 1024),          # Max Bytes
+        ])
+
+        topic2 = b"".join([
+            struct.pack('>h6s', 6, b'topic2'),# Topic
+            struct.pack('>i', 1),             # Num Payloads
+            struct.pack('>i', 1),             # Partition 0
+            struct.pack('>q', 20),            # Offset
+            struct.pack('>i', 100),           # Max Bytes
+        ])
+
+        expected1 = b"".join([ header, topic1, topic2 ])
+        expected2 = b"".join([ header, topic2, topic1 ])
+
+        encoded = KafkaProtocol.encode_fetch_request(b"client1", 3, requests, 2, 100)
+        self.assertIn(encoded, [ expected1, expected2 ])
+
+    def test_decode_fetch_response(self):
+        t1 = b"topic1"
+        t2 = b"topic2"
+        msgs = [create_message(msg)
+                for msg in [b"message1", b"hi", b"boo", b"foo", b"so fun!"]]
+        ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]])
+        ms2 = KafkaProtocol._encode_message_set([msgs[2]])
+        ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]])
+
+        encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' %
+                              (len(t1), len(ms1), len(ms2), len(t2), len(ms3)),
+                              4, 2, len(t1), t1, 2, 0, 0, 10, len(ms1), ms1, 1,
+                              1, 20, len(ms2), ms2, len(t2), t2, 1, 0, 0, 30,
+                              len(ms3), ms3)
+
+        responses = list(KafkaProtocol.decode_fetch_response(encoded))
+        def expand_messages(response):
+            return FetchResponse(response.topic, response.partition,
+                                 response.error, response.highwaterMark,
+                                 list(response.messages))
+
+        expanded_responses = list(map(expand_messages, responses))
+        expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
+                                               OffsetAndMessage(0, msgs[1])]),
+                  FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
+                  FetchResponse(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]),
+                                               OffsetAndMessage(0, msgs[4])])]
+        self.assertEqual(expanded_responses, expect)
+
+    def test_encode_metadata_request_no_topics(self):
+        expected = b"".join([
+            struct.pack(">i", 17),         # Total length of the request
+            struct.pack('>h', 3),          # API key metadata fetch
+            struct.pack('>h', 0),          # API version
+            struct.pack('>i', 4),          # Correlation ID
+            struct.pack('>h3s', 3, b"cid"),# The client ID
+            struct.pack('>i', 0),          # No topics, give all the data!
+        ])
+
+        encoded = KafkaProtocol.encode_metadata_request(b"cid", 4)
+
+        self.assertEqual(encoded, expected)
+
+    def test_encode_metadata_request_with_topics(self):
+        expected = b"".join([
+            struct.pack(">i", 25),         # Total length of the request
+            struct.pack('>h', 3),          # API key metadata fetch
+            struct.pack('>h', 0),          # API version
+            struct.pack('>i', 4),          # Correlation ID
+            struct.pack('>h3s', 3, b"cid"),# The client ID
+            struct.pack('>i', 2),          # Number of topics in the request
+            struct.pack('>h2s', 2, b"t1"), # Topic "t1"
+            struct.pack('>h2s', 2, b"t2"), # Topic "t2"
+        ])
+
+        encoded = KafkaProtocol.encode_metadata_request(b"cid", 4, [b"t1", b"t2"])
+
+        self.assertEqual(encoded, expected)
+
+    def _create_encoded_metadata_response(self, brokers, topics):
+        encoded = []
+        encoded.append(struct.pack('>ii', 3, len(brokers)))
+        for broker in brokers:
+            encoded.append(struct.pack('>ih%dsi' % len(broker.host),
+                                       broker.nodeId, len(broker.host),
+                                       broker.host, broker.port))
+
+        encoded.append(struct.pack('>i', len(topics)))
+        for topic in topics:
+            encoded.append(struct.pack('>hh%dsi' % len(topic.topic),
+                                       topic.error, len(topic.topic),
+                                       topic.topic, len(topic.partitions)))
+            for metadata in topic.partitions:
+                encoded.append(struct.pack('>hiii', metadata.error,
+                                           metadata.partition, metadata.leader,
+                                           len(metadata.replicas)))
+                if len(metadata.replicas) > 0:
+                    encoded.append(struct.pack('>%di' % len(metadata.replicas),
+                                               *metadata.replicas))
+
+                encoded.append(struct.pack('>i', len(metadata.isr)))
+                if len(metadata.isr) > 0:
+                    encoded.append(struct.pack('>%di' % len(metadata.isr),
+                                               *metadata.isr))
+        return b''.join(encoded)
+
+    def test_decode_metadata_response(self):
+        node_brokers = [
+            BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
+            BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
+            BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
+        ]
+
+        topic_partitions = [
+            TopicMetadata(b"topic1", 0, [
+                PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0),
+                PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1)
+            ]),
+            TopicMetadata(b"topic2", 1, [
+                PartitionMetadata(b"topic2", 0, 0, (), (), 0),
+            ]),
+        ]
+        encoded = self._create_encoded_metadata_response(node_brokers,
+                                                         topic_partitions)
+        decoded = KafkaProtocol.decode_metadata_response(encoded)
+        self.assertEqual(decoded, (node_brokers, topic_partitions))
+
+    def test_encode_offset_request(self):
+        expected = b"".join([
+            struct.pack(">i", 21),         # Total length of the request
+            struct.pack('>h', 2),          # Message type = offset fetch
+            struct.pack('>h', 0),          # API version
+            struct.pack('>i', 4),          # Correlation ID
+            struct.pack('>h3s', 3, b"cid"), # The client ID
+            struct.pack('>i', -1),         # Replica Id
+            struct.pack('>i', 0),          # No topic/partitions
+        ])
+
+        encoded = KafkaProtocol.encode_offset_request(b"cid", 4)
+
+        self.assertEqual(encoded, expected)
+
+    def test_encode_offset_request__no_payload(self):
+        expected = b"".join([
+            struct.pack(">i", 65),            # Total length of the request
+
+            struct.pack('>h', 2),             # Message type = offset fetch
+            struct.pack('>h', 0),             # API version
+            struct.pack('>i', 4),             # Correlation ID
+            struct.pack('>h3s', 3, b"cid"),   # The client ID
+            struct.pack('>i', -1),            # Replica Id
+            struct.pack('>i', 1),             # Num topics
+            struct.pack(">h6s", 6, b"topic1"),# Topic for the request
+            struct.pack(">i", 2),             # Two partitions
+
+            struct.pack(">i", 3),             # Partition 3
+            struct.pack(">q", -1),            # No time offset
+            struct.pack(">i", 1),             # One offset requested
+
+            struct.pack(">i", 4),             # Partition 3
+            struct.pack(">q", -1),            # No time offset
+            struct.pack(">i", 1),             # One offset requested
+        ])
+
+        encoded = KafkaProtocol.encode_offset_request(b"cid", 4, [
+            OffsetRequest(b'topic1', 3, -1, 1),
+            OffsetRequest(b'topic1', 4, -1, 1),
+        ])
+
+        self.assertEqual(encoded, expected)
+
+    def test_decode_offset_response(self):
+        encoded = b"".join([
+            struct.pack(">i", 42),            # Correlation ID
+            struct.pack(">i", 1),             # One topics
+            struct.pack(">h6s", 6, b"topic1"),# First topic
+            struct.pack(">i", 2),             # Two partitions
+
+            struct.pack(">i", 2),             # Partition 2
+            struct.pack(">h", 0),             # No error
+            struct.pack(">i", 1),             # One offset
+            struct.pack(">q", 4),             # Offset 4
+
+            struct.pack(">i", 4),             # Partition 4
+            struct.pack(">h", 0),             # No error
+            struct.pack(">i", 1),             # One offset
+            struct.pack(">q", 8),             # Offset 8
+        ])
+
+        results = KafkaProtocol.decode_offset_response(encoded)
+        self.assertEqual(set(results), set([
+            OffsetResponse(topic = b'topic1', partition = 2, error = 0, offsets=(4,)),
+            OffsetResponse(topic = b'topic1', partition = 4, error = 0, offsets=(8,)),
+        ]))
+
+    def test_encode_offset_commit_request(self):
+        header = b"".join([
+            struct.pack('>i', 99),               # Total message length
+
+            struct.pack('>h', 8),                # Message type = offset commit
+            struct.pack('>h', 0),                # API version
+            struct.pack('>i', 42),               # Correlation ID
+            struct.pack('>h9s', 9, b"client_id"),# The client ID
+            struct.pack('>h8s', 8, b"group_id"), # The group to commit for
+            struct.pack('>i', 2),                # Num topics
+        ])
+
+        topic1 = b"".join([
+            struct.pack(">h6s", 6, b"topic1"),   # Topic for the request
+            struct.pack(">i", 2),                # Two partitions
+            struct.pack(">i", 0),                # Partition 0
+            struct.pack(">q", 123),              # Offset 123
+            struct.pack(">h", -1),               # Null metadata
+            struct.pack(">i", 1),                # Partition 1
+            struct.pack(">q", 234),              # Offset 234
+            struct.pack(">h", -1),               # Null metadata
+        ])
+
+        topic2 = b"".join([
+            struct.pack(">h6s", 6, b"topic2"),   # Topic for the request
+            struct.pack(">i", 1),                # One partition
+            struct.pack(">i", 2),                # Partition 2
+            struct.pack(">q", 345),              # Offset 345
+            struct.pack(">h", -1),               # Null metadata
+        ])
+
+        expected1 = b"".join([ header, topic1, topic2 ])
+        expected2 = b"".join([ header, topic2, topic1 ])
+
+        encoded = KafkaProtocol.encode_offset_commit_request(b"client_id", 42, b"group_id", [
+            OffsetCommitRequest(b"topic1", 0, 123, None),
+            OffsetCommitRequest(b"topic1", 1, 234, None),
+            OffsetCommitRequest(b"topic2", 2, 345, None),
+        ])
+
+        self.assertIn(encoded, [ expected1, expected2 ])
+
+    def test_decode_offset_commit_response(self):
+        encoded = b"".join([
+            struct.pack(">i", 42),            # Correlation ID
+            struct.pack(">i", 1),             # One topic
+            struct.pack(">h6s", 6, b"topic1"),# First topic
+            struct.pack(">i", 2),             # Two partitions
+
+            struct.pack(">i", 2),             # Partition 2
+            struct.pack(">h", 0),             # No error
+
+            struct.pack(">i", 4),             # Partition 4
+            struct.pack(">h", 0),             # No error
+        ])
+
+        results = KafkaProtocol.decode_offset_commit_response(encoded)
+        self.assertEqual(set(results), set([
+            OffsetCommitResponse(topic = b'topic1', partition = 2, error = 0),
+            OffsetCommitResponse(topic = b'topic1', partition = 4, error = 0),
+        ]))
+
+    def test_encode_offset_fetch_request(self):
+        header = b"".join([
+            struct.pack('>i', 69),               # Total message length
+            struct.pack('>h', 9),                # Message type = offset fetch
+            struct.pack('>h', 0),                # API version
+            struct.pack('>i', 42),               # Correlation ID
+            struct.pack('>h9s', 9, b"client_id"),# The client ID
+            struct.pack('>h8s', 8, b"group_id"), # The group to commit for
+            struct.pack('>i', 2),                # Num topics
+        ])
+
+        topic1 = b"".join([
+            struct.pack(">h6s", 6, b"topic1"),   # Topic for the request
+            struct.pack(">i", 2),                # Two partitions
+            struct.pack(">i", 0),                # Partition 0
+            struct.pack(">i", 1),                # Partition 1
+        ])
+
+        topic2 = b"".join([
+            struct.pack(">h6s", 6, b"topic2"),   # Topic for the request
+            struct.pack(">i", 1),                # One partitions
+            struct.pack(">i", 2),                # Partition 2
+        ])
+
+        expected1 = b"".join([ header, topic1, topic2 ])
+        expected2 = b"".join([ header, topic2, topic1 ])
+
+        encoded = KafkaProtocol.encode_offset_fetch_request(b"client_id", 42, b"group_id", [
+            OffsetFetchRequest(b"topic1", 0),
+            OffsetFetchRequest(b"topic1", 1),
+            OffsetFetchRequest(b"topic2", 2),
+        ])
+
+        self.assertIn(encoded, [ expected1, expected2 ])
+
+    def test_decode_offset_fetch_response(self):
+        encoded = b"".join([
+            struct.pack(">i", 42),            # Correlation ID
+            struct.pack(">i", 1),             # One topics
+            struct.pack(">h6s", 6, b"topic1"),# First topic
+            struct.pack(">i", 2),             # Two partitions
+
+            struct.pack(">i", 2),             # Partition 2
+            struct.pack(">q", 4),             # Offset 4
+            struct.pack(">h4s", 4, b"meta"),  # Metadata
+            struct.pack(">h", 0),             # No error
+
+            struct.pack(">i", 4),             # Partition 4
+            struct.pack(">q", 8),             # Offset 8
+            struct.pack(">h4s", 4, b"meta"),  # Metadata
+            struct.pack(">h", 0),             # No error
+        ])
+
+        results = KafkaProtocol.decode_offset_fetch_response(encoded)
+        self.assertEqual(set(results), set([
+            OffsetFetchResponse(topic = b'topic1', partition = 2, offset = 4, error = 0, metadata = b"meta"),
+            OffsetFetchResponse(topic = b'topic1', partition = 4, offset = 8, error = 0, metadata = b"meta"),
+        ]))
+
+    @contextmanager
+    def mock_create_message_fns(self):
+        import kafka.protocol
+        with patch.object(kafka.protocol, "create_message",
+                               return_value=sentinel.message):
+            with patch.object(kafka.protocol, "create_gzip_message",
+                                   return_value=sentinel.gzip_message):
+                with patch.object(kafka.protocol, "create_snappy_message",
+                                       return_value=sentinel.snappy_message):
+                    yield
+
+    def test_create_message_set(self):
+        messages = [1, 2, 3]
+
+        # Default codec is CODEC_NONE. Expect list of regular messages.
+        expect = [sentinel.message] * len(messages)
+        with self.mock_create_message_fns():
+            message_set = create_message_set(messages)
+        self.assertEqual(message_set, expect)
+
+        # CODEC_NONE: Expect list of regular messages.
+        expect = [sentinel.message] * len(messages)
+        with self.mock_create_message_fns():
+            message_set = create_message_set(messages, CODEC_NONE)
+        self.assertEqual(message_set, expect)
+
+        # CODEC_GZIP: Expect list of one gzip-encoded message.
+        expect = [sentinel.gzip_message]
+        with self.mock_create_message_fns():
+            message_set = create_message_set(messages, CODEC_GZIP)
+        self.assertEqual(message_set, expect)
+
+        # CODEC_SNAPPY: Expect list of one snappy-encoded message.
+        expect = [sentinel.snappy_message]
+        with self.mock_create_message_fns():
+            message_set = create_message_set(messages, CODEC_SNAPPY)
+        self.assertEqual(message_set, expect)
+
+        # Unknown codec should raise UnsupportedCodecError.
+        with self.assertRaises(UnsupportedCodecError):
+            create_message_set(messages, -1)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_util.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_util.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_util.py
new file mode 100644
index 0000000..6a8f45b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_util.py
@@ -0,0 +1,126 @@
+# -*- coding: utf-8 -*-
+import struct
+
+import six
+from . import unittest
+
+import kafka.common
+import kafka.util
+
+
+class UtilTest(unittest.TestCase):
+    @unittest.skip("Unwritten")
+    def test_relative_unpack(self):
+        pass
+
+    def test_write_int_string(self):
+        self.assertEqual(
+            kafka.util.write_int_string(b'some string'),
+            b'\x00\x00\x00\x0bsome string'
+        )
+
+    def test_write_int_string__unicode(self):
+        with self.assertRaises(TypeError) as cm:
+            kafka.util.write_int_string(u'unicode')
+        #: :type: TypeError
+        te = cm.exception
+        if six.PY2:
+            self.assertIn('unicode', str(te))
+        else:
+            self.assertIn('str', str(te))
+        self.assertIn('to be bytes', str(te))
+
+    def test_write_int_string__empty(self):
+        self.assertEqual(
+            kafka.util.write_int_string(b''),
+            b'\x00\x00\x00\x00'
+        )
+
+    def test_write_int_string__null(self):
+        self.assertEqual(
+            kafka.util.write_int_string(None),
+            b'\xff\xff\xff\xff'
+        )
+
+    def test_read_int_string(self):
+        self.assertEqual(kafka.util.read_int_string(b'\xff\xff\xff\xff', 0), (None, 4))
+        self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x00', 0), (b'', 4))
+        self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x0bsome string', 0), (b'some string', 15))
+
+    def test_read_int_string__insufficient_data(self):
+        with self.assertRaises(kafka.common.BufferUnderflowError):
+            kafka.util.read_int_string(b'\x00\x00\x00\x021', 0)
+
+    def test_write_short_string(self):
+        self.assertEqual(
+            kafka.util.write_short_string(b'some string'),
+            b'\x00\x0bsome string'
+        )
+
+    def test_write_short_string__unicode(self):
+        with self.assertRaises(TypeError) as cm:
+            kafka.util.write_short_string(u'hello')
+        #: :type: TypeError
+        te = cm.exception
+        if six.PY2:
+            self.assertIn('unicode', str(te))
+        else:
+            self.assertIn('str', str(te))
+        self.assertIn('to be bytes', str(te))
+
+    def test_write_short_string__empty(self):
+        self.assertEqual(
+            kafka.util.write_short_string(b''),
+            b'\x00\x00'
+        )
+
+    def test_write_short_string__null(self):
+        self.assertEqual(
+            kafka.util.write_short_string(None),
+            b'\xff\xff'
+        )
+
+    def test_write_short_string__too_long(self):
+        with self.assertRaises(struct.error):
+            kafka.util.write_short_string(b' ' * 33000)
+
+    def test_read_short_string(self):
+        self.assertEqual(kafka.util.read_short_string(b'\xff\xff', 0), (None, 2))
+        self.assertEqual(kafka.util.read_short_string(b'\x00\x00', 0), (b'', 2))
+        self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13))
+
+    def test_read_int_string__insufficient_data2(self):
+        with self.assertRaises(kafka.common.BufferUnderflowError):
+            kafka.util.read_int_string('\x00\x021', 0)
+
+    def test_relative_unpack2(self):
+        self.assertEqual(
+            kafka.util.relative_unpack('>hh', b'\x00\x01\x00\x00\x02', 0),
+            ((1, 0), 4)
+        )
+
+    def test_relative_unpack3(self):
+        with self.assertRaises(kafka.common.BufferUnderflowError):
+            kafka.util.relative_unpack('>hh', '\x00', 0)
+
+    def test_group_by_topic_and_partition(self):
+        t = kafka.common.TopicAndPartition
+
+        l = [
+            t("a", 1),
+            t("a", 1),
+            t("a", 2),
+            t("a", 3),
+            t("b", 3),
+        ]
+
+        self.assertEqual(kafka.util.group_by_topic_and_partition(l), {
+            "a": {
+                1: t("a", 1),
+                2: t("a", 2),
+                3: t("a", 3),
+            },
+            "b": {
+                3: t("b", 3),
+            }
+        })



Mime
View raw message