eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [5/7] incubator-eagle git commit: EAGLE-120 EAGLE-100 initial system and hadoop metric initial system and hadoop metric https://issues.apache.org/jira/browse/EAGLE-120 Author: qingwen220 qingwzhao@ebay.com Reviewer: yonzhang2012 yonzhang2012@apache.org C
Date Wed, 13 Jan 2016 01:08:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/consumer/multiprocess.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/consumer/multiprocess.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/consumer/multiprocess.py
new file mode 100644
index 0000000..4dc04dc
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/consumer/multiprocess.py
@@ -0,0 +1,251 @@
+from __future__ import absolute_import
+
+import logging
+import time
+from multiprocessing import Process, Queue as MPQueue, Event, Value
+
+try:
+    from Queue import Empty
+except ImportError:  # python 2
+    from queue import Empty
+
+from .base import (
+    AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
+    NO_MESSAGES_WAIT_TIME_SECONDS
+)
+from .simple import Consumer, SimpleConsumer
+
+log = logging.getLogger("kafka")
+
+
+def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
+    """
+    A child process worker which consumes messages based on the
+    notifications given by the controller process
+
+    NOTE: Ideally, this should have been a method inside the Consumer
+    class. However, multiprocessing module has issues in windows. The
+    functionality breaks unless this function is kept outside of a class
+    """
+
+    # Make the child processes open separate socket connections
+    client.reinit()
+
+    # We will start consumers without auto-commit. Auto-commit will be
+    # done by the master controller process.
+    consumer = SimpleConsumer(client, group, topic,
+                              partitions=chunk,
+                              auto_commit=False,
+                              auto_commit_every_n=None,
+                              auto_commit_every_t=None)
+
+    # Ensure that the consumer provides the partition information
+    consumer.provide_partition_info()
+
+    while True:
+        # Wait till the controller indicates us to start consumption
+        start.wait()
+
+        # If we are asked to quit, do so
+        if exit.is_set():
+            break
+
+        # Consume messages and add them to the queue. If the controller
+        # indicates a specific number of messages, follow that advice
+        count = 0
+
+        message = consumer.get_message()
+        if message:
+            queue.put(message)
+            count += 1
+
+            # We have reached the required size. The controller might have
+            # more than what he needs. Wait for a while.
+            # Without this logic, it is possible that we run into a big
+            # loop consuming all available messages before the controller
+            # can reset the 'start' event
+            if count == size.value:
+                pause.wait()
+
+        else:
+            # In case we did not receive any message, give up the CPU for
+            # a while before we try again
+            time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
+
+    consumer.stop()
+
+
+class MultiProcessConsumer(Consumer):
+    """
+    A consumer implementation that consumes partitions for a topic in
+    parallel using multiple processes
+
+    Arguments:
+        client: a connected KafkaClient
+        group: a name for this consumer, used for offset storage and must be unique
+        topic: the topic to consume
+
+    Keyword Arguments:
+        auto_commit: default True. Whether or not to auto commit the offsets
+        auto_commit_every_n: default 100. How many messages to consume
+            before a commit
+        auto_commit_every_t: default 5000. How much time (in milliseconds) to
+            wait before commit
+        num_procs: Number of processes to start for consuming messages.
+            The available partitions will be divided among these processes
+        partitions_per_proc: Number of partitions to be allocated per process
+            (overrides num_procs)
+
+    Auto commit details:
+    If both auto_commit_every_n and auto_commit_every_t are set, they will
+    reset one another when one is triggered. These triggers simply call the
+    commit method on this class. A manual call to commit will also reset
+    these triggers
+    """
+    def __init__(self, client, group, topic, auto_commit=True,
+                 auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
+                 auto_commit_every_t=AUTO_COMMIT_INTERVAL,
+                 num_procs=1, partitions_per_proc=0):
+
+        # Initiate the base consumer class
+        super(MultiProcessConsumer, self).__init__(
+            client, group, topic,
+            partitions=None,
+            auto_commit=auto_commit,
+            auto_commit_every_n=auto_commit_every_n,
+            auto_commit_every_t=auto_commit_every_t)
+
+        # Variables for managing and controlling the data flow from
+        # consumer child process to master
+        self.queue = MPQueue(1024)  # Child consumers dump messages into this
+        self.start = Event()        # Indicates the consumers to start fetch
+        self.exit = Event()         # Requests the consumers to shutdown
+        self.pause = Event()        # Requests the consumers to pause fetch
+        self.size = Value('i', 0)   # Indicator of number of messages to fetch
+
+        partitions = self.offsets.keys()
+
+        # If unspecified, start one consumer per partition
+        # The logic below ensures that
+        # * we do not cross the num_procs limit
+        # * we have an even distribution of partitions among processes
+        if not partitions_per_proc:
+            partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
+            if partitions_per_proc < num_procs * 0.5:
+                partitions_per_proc += 1
+
+        # The final set of chunks
+        chunker = lambda *x: [] + list(x)
+        chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
+
+        self.procs = []
+        for chunk in chunks:
+            chunk = filter(lambda x: x is not None, chunk)
+            args = (client.copy(),
+                    group, topic, list(chunk),
+                    self.queue, self.start, self.exit,
+                    self.pause, self.size)
+
+            proc = Process(target=_mp_consume, args=args)
+            proc.daemon = True
+            proc.start()
+            self.procs.append(proc)
+
+    def __repr__(self):
+        return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
+            (self.group, self.topic, len(self.procs))
+
+    def stop(self):
+        # Set exit and start off all waiting consumers
+        self.exit.set()
+        self.pause.set()
+        self.start.set()
+
+        for proc in self.procs:
+            proc.join()
+            proc.terminate()
+
+        super(MultiProcessConsumer, self).stop()
+
+    def __iter__(self):
+        """
+        Iterator to consume the messages available on this consumer
+        """
+        # Trigger the consumer procs to start off.
+        # We will iterate till there are no more messages available
+        self.size.value = 0
+        self.pause.set()
+
+        while True:
+            self.start.set()
+            try:
+                # We will block for a small while so that the consumers get
+                # a chance to run and put some messages in the queue
+                # TODO: This is a hack and will make the consumer block for
+                # at least one second. Need to find a better way of doing this
+                partition, message = self.queue.get(block=True, timeout=1)
+            except Empty:
+                break
+
+            # Count, check and commit messages if necessary
+            self.offsets[partition] = message.offset + 1
+            self.start.clear()
+            self.count_since_commit += 1
+            self._auto_commit()
+            yield message
+
+        self.start.clear()
+
+    def get_messages(self, count=1, block=True, timeout=10):
+        """
+        Fetch the specified number of messages
+
+        Keyword Arguments:
+            count: Indicates the maximum number of messages to be fetched
+            block: If True, the API will block till some messages are fetched.
+            timeout: If block is True, the function will block for the specified
+                time (in seconds) until count messages is fetched. If None,
+                it will block forever.
+        """
+        messages = []
+
+        # Give a size hint to the consumers. Each consumer process will fetch
+        # a maximum of "count" messages. This will fetch more messages than
+        # necessary, but these will not be committed to kafka. Also, the extra
+        # messages can be provided in subsequent runs
+        self.size.value = count
+        self.pause.clear()
+
+        if timeout is not None:
+            max_time = time.time() + timeout
+
+        new_offsets = {}
+        while count > 0 and (timeout is None or timeout > 0):
+            # Trigger consumption only if the queue is empty
+            # By doing this, we will ensure that consumers do not
+            # go into overdrive and keep consuming thousands of
+            # messages when the user might need only a few
+            if self.queue.empty():
+                self.start.set()
+
+            try:
+                partition, message = self.queue.get(block, timeout)
+            except Empty:
+                break
+
+            messages.append(message)
+            new_offsets[partition] = message.offset + 1
+            count -= 1
+            if timeout is not None:
+                timeout = max_time - time.time()
+
+        self.size.value = 0
+        self.start.clear()
+        self.pause.set()
+
+        # Update and commit offsets if necessary
+        self.offsets.update(new_offsets)
+        self.count_since_commit += len(messages)
+        self._auto_commit()
+
+        return messages

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/consumer/simple.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/consumer/simple.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/consumer/simple.py
new file mode 100644
index 0000000..000fcd9
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/consumer/simple.py
@@ -0,0 +1,330 @@
+from __future__ import absolute_import
+
+try:
+    from itertools import zip_longest as izip_longest, repeat  # pylint: disable-msg=E0611
+except ImportError:  # python 2
+    from itertools import izip_longest as izip_longest, repeat
+import logging
+import time
+
+import six
+
+try:
+    from Queue import Empty, Queue
+except ImportError:  # python 2
+    from queue import Empty, Queue
+
+from kafka.common import (
+    FetchRequest, OffsetRequest,
+    ConsumerFetchSizeTooSmall, ConsumerNoMoreData
+)
+from .base import (
+    Consumer,
+    FETCH_DEFAULT_BLOCK_TIMEOUT,
+    AUTO_COMMIT_MSG_COUNT,
+    AUTO_COMMIT_INTERVAL,
+    FETCH_MIN_BYTES,
+    FETCH_BUFFER_SIZE_BYTES,
+    MAX_FETCH_BUFFER_SIZE_BYTES,
+    FETCH_MAX_WAIT_TIME,
+    ITER_TIMEOUT_SECONDS,
+    NO_MESSAGES_WAIT_TIME_SECONDS
+)
+
+log = logging.getLogger("kafka")
+
+class FetchContext(object):
+    """
+    Class for managing the state of a consumer during fetch
+    """
+    def __init__(self, consumer, block, timeout):
+        self.consumer = consumer
+        self.block = block
+
+        if block:
+            if not timeout:
+                timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
+            self.timeout = timeout * 1000
+
+    def __enter__(self):
+        """Set fetch values based on blocking status"""
+        self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
+        self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
+        if self.block:
+            self.consumer.fetch_max_wait_time = self.timeout
+            self.consumer.fetch_min_bytes = 1
+        else:
+            self.consumer.fetch_min_bytes = 0
+
+    def __exit__(self, type, value, traceback):
+        """Reset values"""
+        self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
+        self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
+
+
+class SimpleConsumer(Consumer):
+    """
+    A simple consumer implementation that consumes all/specified partitions
+    for a topic
+
+    Arguments:
+        client: a connected KafkaClient
+        group: a name for this consumer, used for offset storage and must be unique
+        topic: the topic to consume
+
+    Keyword Arguments:
+        partitions: An optional list of partitions to consume the data from
+
+        auto_commit: default True. Whether or not to auto commit the offsets
+
+        auto_commit_every_n: default 100. How many messages to consume
+             before a commit
+
+        auto_commit_every_t: default 5000. How much time (in milliseconds) to
+             wait before commit
+        fetch_size_bytes: number of bytes to request in a FetchRequest
+
+        buffer_size: default 4K. Initial number of bytes to tell kafka we
+             have available. This will double as needed.
+
+        max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+             available. None means no limit.
+
+        iter_timeout: default None. How much time (in seconds) to wait for a
+             message in the iterator before exiting. None means no
+             timeout, so it will wait forever.
+
+    Auto commit details:
+    If both auto_commit_every_n and auto_commit_every_t are set, they will
+    reset one another when one is triggered. These triggers simply call the
+    commit method on this class. A manual call to commit will also reset
+    these triggers
+    """
+    def __init__(self, client, group, topic, auto_commit=True, partitions=None,
+                 auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
+                 auto_commit_every_t=AUTO_COMMIT_INTERVAL,
+                 fetch_size_bytes=FETCH_MIN_BYTES,
+                 buffer_size=FETCH_BUFFER_SIZE_BYTES,
+                 max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
+                 iter_timeout=None):
+        super(SimpleConsumer, self).__init__(
+            client, group, topic,
+            partitions=partitions,
+            auto_commit=auto_commit,
+            auto_commit_every_n=auto_commit_every_n,
+            auto_commit_every_t=auto_commit_every_t)
+
+        if max_buffer_size is not None and buffer_size > max_buffer_size:
+            raise ValueError("buffer_size (%d) is greater than "
+                             "max_buffer_size (%d)" %
+                             (buffer_size, max_buffer_size))
+        self.buffer_size = buffer_size
+        self.max_buffer_size = max_buffer_size
+        self.partition_info = False     # Do not return partition info in msgs
+        self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
+        self.fetch_min_bytes = fetch_size_bytes
+        self.fetch_offsets = self.offsets.copy()
+        self.iter_timeout = iter_timeout
+        self.queue = Queue()
+
+    def __repr__(self):
+        return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
+            (self.group, self.topic, str(self.offsets.keys()))
+
+    def provide_partition_info(self):
+        """
+        Indicates that partition info must be returned by the consumer
+        """
+        self.partition_info = True
+
+    def seek(self, offset, whence):
+        """
+        Alter the current offset in the consumer, similar to fseek
+
+        Arguments:
+            offset: how much to modify the offset
+            whence: where to modify it from
+
+                * 0 is relative to the earliest available offset (head)
+                * 1 is relative to the current offset
+                * 2 is relative to the latest known offset (tail)
+        """
+
+        if whence == 1:  # relative to current position
+            for partition, _offset in self.offsets.items():
+                self.offsets[partition] = _offset + offset
+        elif whence in (0, 2):  # relative to beginning or end
+            # divide the request offset by number of partitions,
+            # distribute the remained evenly
+            (delta, rem) = divmod(offset, len(self.offsets))
+            deltas = {}
+            for partition, r in izip_longest(self.offsets.keys(),
+                                             repeat(1, rem), fillvalue=0):
+                deltas[partition] = delta + r
+
+            reqs = []
+            for partition in self.offsets.keys():
+                if whence == 0:
+                    reqs.append(OffsetRequest(self.topic, partition, -2, 1))
+                elif whence == 2:
+                    reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+                else:
+                    pass
+
+            resps = self.client.send_offset_request(reqs)
+            for resp in resps:
+                self.offsets[resp.partition] = \
+                    resp.offsets[0] + deltas[resp.partition]
+        else:
+            raise ValueError("Unexpected value for `whence`, %d" % whence)
+
+        # Reset queue and fetch offsets since they are invalid
+        self.fetch_offsets = self.offsets.copy()
+        if self.auto_commit:
+            self.count_since_commit += 1
+            self.commit()
+
+        self.queue = Queue()
+
+    def get_messages(self, count=1, block=True, timeout=0.1):
+        """
+        Fetch the specified number of messages
+
+        Keyword Arguments:
+            count: Indicates the maximum number of messages to be fetched
+            block: If True, the API will block till some messages are fetched.
+            timeout: If block is True, the function will block for the specified
+                time (in seconds) until count messages is fetched. If None,
+                it will block forever.
+        """
+        messages = []
+        if timeout is not None:
+            max_time = time.time() + timeout
+
+        new_offsets = {}
+        while count > 0 and (timeout is None or timeout > 0):
+            result = self._get_message(block, timeout, get_partition_info=True,
+                                       update_offset=False)
+            if result:
+                partition, message = result
+                if self.partition_info:
+                    messages.append(result)
+                else:
+                    messages.append(message)
+                new_offsets[partition] = message.offset + 1
+                count -= 1
+            else:
+                # Ran out of messages for the last request.
+                if not block:
+                    # If we're not blocking, break.
+                    break
+
+            # If we have a timeout, reduce it to the
+            # appropriate value
+            if timeout is not None:
+                timeout = max_time - time.time()
+
+        # Update and commit offsets if necessary
+        self.offsets.update(new_offsets)
+        self.count_since_commit += len(messages)
+        self._auto_commit()
+        return messages
+
+    def get_message(self, block=True, timeout=0.1, get_partition_info=None):
+        return self._get_message(block, timeout, get_partition_info)
+
+    def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
+                     update_offset=True):
+        """
+        If no messages can be fetched, returns None.
+        If get_partition_info is None, it defaults to self.partition_info
+        If get_partition_info is True, returns (partition, message)
+        If get_partition_info is False, returns message
+        """
+        if self.queue.empty():
+            # We're out of messages, go grab some more.
+            with FetchContext(self, block, timeout):
+                self._fetch()
+        try:
+            partition, message = self.queue.get_nowait()
+
+            if update_offset:
+                # Update partition offset
+                self.offsets[partition] = message.offset + 1
+
+                # Count, check and commit messages if necessary
+                self.count_since_commit += 1
+                self._auto_commit()
+
+            if get_partition_info is None:
+                get_partition_info = self.partition_info
+            if get_partition_info:
+                return partition, message
+            else:
+                return message
+        except Empty:
+            return None
+
+    def __iter__(self):
+        if self.iter_timeout is None:
+            timeout = ITER_TIMEOUT_SECONDS
+        else:
+            timeout = self.iter_timeout
+
+        while True:
+            message = self.get_message(True, timeout)
+            if message:
+                yield message
+            elif self.iter_timeout is None:
+                # We did not receive any message yet but we don't have a
+                # timeout, so give up the CPU for a while before trying again
+                time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
+            else:
+                # Timed out waiting for a message
+                break
+
+    def _fetch(self):
+        # Create fetch request payloads for all the partitions
+        partitions = dict((p, self.buffer_size)
+                      for p in self.fetch_offsets.keys())
+        while partitions:
+            requests = []
+            for partition, buffer_size in six.iteritems(partitions):
+                requests.append(FetchRequest(self.topic, partition,
+                                             self.fetch_offsets[partition],
+                                             buffer_size))
+            # Send request
+            responses = self.client.send_fetch_request(
+                requests,
+                max_wait_time=int(self.fetch_max_wait_time),
+                min_bytes=self.fetch_min_bytes)
+
+            retry_partitions = {}
+            for resp in responses:
+                partition = resp.partition
+                buffer_size = partitions[partition]
+                try:
+                    for message in resp.messages:
+                        # Put the message in our queue
+                        self.queue.put((partition, message))
+                        self.fetch_offsets[partition] = message.offset + 1
+                except ConsumerFetchSizeTooSmall:
+                    if (self.max_buffer_size is not None and
+                            buffer_size == self.max_buffer_size):
+                        log.error("Max fetch size %d too small",
+                                  self.max_buffer_size)
+                        raise
+                    if self.max_buffer_size is None:
+                        buffer_size *= 2
+                    else:
+                        buffer_size = min(buffer_size * 2,
+                                          self.max_buffer_size)
+                    log.warn("Fetch size too small, increase to %d (2x) "
+                             "and retry", buffer_size)
+                    retry_partitions[partition] = buffer_size
+                except ConsumerNoMoreData as e:
+                    log.debug("Iteration was ended by %r", e)
+                except StopIteration:
+                    # Stop iterating through this partition
+                    log.debug("Done iterating over partition %s" % partition)
+            partitions = retry_partitions

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/context.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/context.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/context.py
new file mode 100644
index 0000000..ade4db8
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/context.py
@@ -0,0 +1,175 @@
+"""
+Context manager to commit/rollback consumer offsets.
+"""
+from logging import getLogger
+
+from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError
+
+
+class OffsetCommitContext(object):
+    """
+    Provides commit/rollback semantics around a `SimpleConsumer`.
+
+    Usage assumes that `auto_commit` is disabled, that messages are consumed in
+    batches, and that the consuming process will record its own successful
+    processing of each message. Both the commit and rollback operations respect
+    a "high-water mark" to ensure that last unsuccessfully processed message
+    will be retried.
+
+    Example:
+
+    .. code:: python
+
+        consumer = SimpleConsumer(client, group, topic, auto_commit=False)
+        consumer.provide_partition_info()
+        consumer.fetch_last_known_offsets()
+
+        while some_condition:
+            with OffsetCommitContext(consumer) as context:
+                messages = consumer.get_messages(count, block=False)
+
+                for partition, message in messages:
+                    if can_process(message):
+                        context.mark(partition, message.offset)
+                    else:
+                        break
+
+                if not context:
+                    sleep(delay)
+
+
+    These semantics allow for deferred message processing (e.g. if `can_process`
+    compares message time to clock time) and for repeated processing of the last
+    unsuccessful message (until some external error is resolved).
+    """
+
+    def __init__(self, consumer):
+        """
+        :param consumer: an instance of `SimpleConsumer`
+        """
+        self.consumer = consumer
+        self.initial_offsets = None
+        self.high_water_mark = None
+        self.logger = getLogger("kafka.context")
+
+    def mark(self, partition, offset):
+        """
+        Set the high-water mark in the current context.
+
+        In order to know the current partition, it is helpful to initialize
+        the consumer to provide partition info via:
+
+        .. code:: python
+
+            consumer.provide_partition_info()
+
+        """
+        max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
+
+        self.logger.debug("Setting high-water mark to: %s",
+                          {partition: max_offset})
+
+        self.high_water_mark[partition] = max_offset
+
+    def __nonzero__(self):
+        """
+        Return whether any operations were marked in the context.
+        """
+        return bool(self.high_water_mark)
+
+    def __enter__(self):
+        """
+        Start a new context:
+
+         -  Record the initial offsets for rollback
+         -  Reset the high-water mark
+        """
+        self.initial_offsets = dict(self.consumer.offsets)
+        self.high_water_mark = dict()
+
+        self.logger.debug("Starting context at: %s", self.initial_offsets)
+
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        """
+        End a context.
+
+         -  If there was no exception, commit up to the current high-water mark.
+         -  If there was an offset of range error, attempt to find the correct
+            initial offset.
+         -  If there was any other error, roll back to the initial offsets.
+        """
+        if exc_type is None:
+            self.commit()
+        elif isinstance(exc_value, OffsetOutOfRangeError):
+            self.handle_out_of_range()
+            return True
+        else:
+            self.rollback()
+
+    def commit(self):
+        """
+        Commit this context's offsets:
+
+         -  If the high-water mark has moved, commit up to and position the
+            consumer at the high-water mark.
+         -  Otherwise, reset to the consumer to the initial offsets.
+        """
+        if self.high_water_mark:
+            self.logger.info("Committing offsets: %s", self.high_water_mark)
+            self.commit_partition_offsets(self.high_water_mark)
+            self.update_consumer_offsets(self.high_water_mark)
+        else:
+            self.update_consumer_offsets(self.initial_offsets)
+
+    def rollback(self):
+        """
+        Rollback this context:
+
+         -  Position the consumer at the initial offsets.
+        """
+        self.logger.info("Rolling back context: %s", self.initial_offsets)
+        self.update_consumer_offsets(self.initial_offsets)
+
+    def commit_partition_offsets(self, partition_offsets):
+        """
+        Commit explicit partition/offset pairs.
+        """
+        self.logger.debug("Committing partition offsets: %s", partition_offsets)
+
+        commit_requests = [
+            OffsetCommitRequest(self.consumer.topic, partition, offset, None)
+            for partition, offset in partition_offsets.items()
+        ]
+        commit_responses = self.consumer.client.send_offset_commit_request(
+            self.consumer.group,
+            commit_requests,
+        )
+        for commit_response in commit_responses:
+            check_error(commit_response)
+
+    def update_consumer_offsets(self, partition_offsets):
+        """
+        Update consumer offsets to explicit positions.
+        """
+        self.logger.debug("Updating consumer offsets to: %s", partition_offsets)
+
+        for partition, offset in partition_offsets.items():
+            self.consumer.offsets[partition] = offset
+
+        # consumer keeps other offset states beyond its `offsets` dictionary,
+        # a relative seek with zero delta forces the consumer to reset to the
+        # current value of the `offsets` dictionary
+        self.consumer.seek(0, 1)
+
+    def handle_out_of_range(self):
+        """
+        Handle out of range condition by seeking to the beginning of valid
+        ranges.
+
+        This assumes that an out of range doesn't happen by seeking past the end
+        of valid ranges -- which is far less likely.
+        """
+        self.logger.info("Seeking beginning of partition on out of range error")
+        self.consumer.seek(0, 0)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/__init__.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/__init__.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/__init__.py
new file mode 100644
index 0000000..fdb19bb
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/__init__.py
@@ -0,0 +1,6 @@
+from .roundrobin import RoundRobinPartitioner
+from .hashed import HashedPartitioner
+
+__all__ = [
+    'RoundRobinPartitioner', 'HashedPartitioner'
+]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/base.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/base.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/base.py
new file mode 100644
index 0000000..857f634
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/base.py
@@ -0,0 +1,24 @@
+
+class Partitioner(object):
+    """
+    Base class for a partitioner
+    """
+    def __init__(self, partitions):
+        """
+        Initialize the partitioner
+
+        Arguments:
+            partitions: A list of available partitions (during startup)
+        """
+        self.partitions = partitions
+
+    def partition(self, key, partitions=None):
+        """
+        Takes a string key and num_partitions as argument and returns
+        a partition to be used for the message
+
+        Arguments:
+            key: the key to use for partitioning
+            partitions: (optional) a list of partitions.
+        """
+        raise NotImplementedError('partition function has to be implemented')

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/hashed.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/hashed.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/hashed.py
new file mode 100644
index 0000000..fb5e598
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/hashed.py
@@ -0,0 +1,14 @@
+from .base import Partitioner
+
+class HashedPartitioner(Partitioner):
+    """
+    Implements a partitioner which selects the target partition based on
+    the hash of the key
+    """
+    def partition(self, key, partitions=None):
+        if not partitions:
+            partitions = self.partitions
+        size = len(partitions)
+        idx = hash(key) % size
+
+        return partitions[idx]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/roundrobin.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/roundrobin.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/roundrobin.py
new file mode 100644
index 0000000..6439e53
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/partitioner/roundrobin.py
@@ -0,0 +1,23 @@
+from itertools import cycle
+
+from .base import Partitioner
+
+class RoundRobinPartitioner(Partitioner):
+    """
+    Implements a round robin partitioner which sends data to partitions
+    in a round robin fashion
+    """
+    def __init__(self, partitions):
+        super(RoundRobinPartitioner, self).__init__(partitions)
+        self.iterpart = cycle(partitions)
+
+    def _set_partitions(self, partitions):
+        self.partitions = partitions
+        self.iterpart = cycle(partitions)
+
+    def partition(self, key, partitions=None):
+        # Refresh the partition list if necessary
+        if partitions and self.partitions != partitions:
+            self._set_partitions(partitions)
+
+        return next(self.iterpart)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/__init__.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/__init__.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/__init__.py
new file mode 100644
index 0000000..bc0e7c6
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/__init__.py
@@ -0,0 +1,6 @@
+from .simple import SimpleProducer
+from .keyed import KeyedProducer
+
+__all__ = [
+    'SimpleProducer', 'KeyedProducer'
+]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/base.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/base.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/base.py
new file mode 100644
index 0000000..5b41bc9
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/base.py
@@ -0,0 +1,214 @@
+from __future__ import absolute_import
+
+import logging
+import time
+
+try:
+    from queue import Empty
+except ImportError:
+    from Queue import Empty
+from collections import defaultdict
+from multiprocessing import Queue, Process
+
+import six
+
+from kafka.common import (
+    ProduceRequest, TopicAndPartition, UnsupportedCodecError
+)
+from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
+
+log = logging.getLogger("kafka")
+
+BATCH_SEND_DEFAULT_INTERVAL = 20
+BATCH_SEND_MSG_COUNT = 20
+
+STOP_ASYNC_PRODUCER = -1
+
+
+def _send_upstream(queue, client, codec, batch_time, batch_size,
+                   req_acks, ack_timeout):
+    """
+    Listen on the queue for a specified number of messages or till
+    a specified timeout and send them upstream to the brokers in one
+    request
+
+    NOTE: Ideally, this should have been a method inside the Producer
+    class. However, multiprocessing module has issues in windows. The
+    functionality breaks unless this function is kept outside of a class
+    """
+    stop = False
+    client.reinit()
+
+    while not stop:
+        timeout = batch_time
+        count = batch_size
+        send_at = time.time() + timeout
+        msgset = defaultdict(list)
+
+        # Keep fetching till we gather enough messages or a
+        # timeout is reached
+        while count > 0 and timeout >= 0:
+            try:
+                topic_partition, msg, key = queue.get(timeout=timeout)
+
+            except Empty:
+                break
+
+            # Check if the controller has requested us to stop
+            if topic_partition == STOP_ASYNC_PRODUCER:
+                stop = True
+                break
+
+            # Adjust the timeout to match the remaining period
+            count -= 1
+            timeout = send_at - time.time()
+            msgset[topic_partition].append(msg)
+
+        # Send collected requests upstream
+        reqs = []
+        for topic_partition, msg in msgset.items():
+            messages = create_message_set(msg, codec, key)
+            req = ProduceRequest(topic_partition.topic,
+                                 topic_partition.partition,
+                                 messages)
+            reqs.append(req)
+
+        try:
+            client.send_produce_request(reqs,
+                                        acks=req_acks,
+                                        timeout=ack_timeout)
+        except Exception:
+            log.exception("Unable to send message")
+
+
+class Producer(object):
+    """
+    Base class to be used by producers
+
+    Arguments:
+        client: The Kafka client instance to use
+        async: If set to true, the messages are sent asynchronously via another
+            thread (process). We will not wait for a response to these
+            WARNING!!! current implementation of async producer does not
+            guarantee message delivery.  Use at your own risk! Or help us
+            improve with a PR!
+        req_acks: A value indicating the acknowledgements that the server must
+            receive before responding to the request
+        ack_timeout: Value (in milliseconds) indicating a timeout for waiting
+            for an acknowledgement
+        batch_send: If True, messages are send in batches
+        batch_send_every_n: If set, messages are send in batches of this size
+        batch_send_every_t: If set, messages are send after this timeout
+    """
+
+    ACK_NOT_REQUIRED = 0            # No ack is required
+    ACK_AFTER_LOCAL_WRITE = 1       # Send response after it is written to log
+    ACK_AFTER_CLUSTER_COMMIT = -1   # Send response after data is committed
+
+    DEFAULT_ACK_TIMEOUT = 1000
+
+    def __init__(self, client, async=False,
+                 req_acks=ACK_AFTER_LOCAL_WRITE,
+                 ack_timeout=DEFAULT_ACK_TIMEOUT,
+                 codec=None,
+                 batch_send=False,
+                 batch_send_every_n=BATCH_SEND_MSG_COUNT,
+                 batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+
+        if batch_send:
+            async = True
+            assert batch_send_every_n > 0
+            assert batch_send_every_t > 0
+        else:
+            batch_send_every_n = 1
+            batch_send_every_t = 3600
+
+        self.client = client
+        self.async = async
+        self.req_acks = req_acks
+        self.ack_timeout = ack_timeout
+
+        if codec is None:
+            codec = CODEC_NONE
+        elif codec not in ALL_CODECS:
+            raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
+
+        self.codec = codec
+
+        if self.async:
+            log.warning("async producer does not guarantee message delivery!")
+            log.warning("Current implementation does not retry Failed messages")
+            log.warning("Use at your own risk! (or help improve with a PR!)")
+            self.queue = Queue()  # Messages are sent through this queue
+            self.proc = Process(target=_send_upstream,
+                                args=(self.queue,
+                                      self.client.copy(),
+                                      self.codec,
+                                      batch_send_every_t,
+                                      batch_send_every_n,
+                                      self.req_acks,
+                                      self.ack_timeout))
+
+            # Process will die if main thread exits
+            self.proc.daemon = True
+            self.proc.start()
+
+    def send_messages(self, topic, partition, *msg):
+        """
+        Helper method to send produce requests
+        @param: topic, name of topic for produce request -- type str
+        @param: partition, partition number for produce request -- type int
+        @param: *msg, one or more message payloads -- type bytes
+        @returns: ResponseRequest returned by server
+        raises on error
+
+        Note that msg type *must* be encoded to bytes by user.
+        Passing unicode message will not work, for example
+        you should encode before calling send_messages via
+        something like `unicode_message.encode('utf-8')`
+
+        All messages produced via this method will set the message 'key' to Null
+        """
+        return self._send_messages(topic, partition, *msg)
+
+    def _send_messages(self, topic, partition, *msg, **kwargs):
+        key = kwargs.pop('key', None)
+
+        # Guarantee that msg is actually a list or tuple (should always be true)
+        if not isinstance(msg, (list, tuple)):
+            raise TypeError("msg is not a list or tuple!")
+
+        # Raise TypeError if any message is not encoded as bytes
+        if any(not isinstance(m, six.binary_type) for m in msg):
+            raise TypeError("all produce message payloads must be type bytes")
+
+        # Raise TypeError if the key is not encoded as bytes
+        if key is not None and not isinstance(key, six.binary_type):
+            raise TypeError("the key must be type bytes")
+
+        if self.async:
+            for m in msg:
+                self.queue.put((TopicAndPartition(topic, partition), m, key))
+            resp = []
+        else:
+            messages = create_message_set(msg, self.codec, key)
+            req = ProduceRequest(topic, partition, messages)
+            try:
+                resp = self.client.send_produce_request([req], acks=self.req_acks,
+                                                        timeout=self.ack_timeout)
+            except Exception:
+                log.exception("Unable to send messages")
+                raise
+        return resp
+
+    def stop(self, timeout=1):
+        """
+        Stop the producer. Optionally wait for the specified timeout before
+        forcefully cleaning up.
+        """
+        if self.async:
+            self.queue.put((STOP_ASYNC_PRODUCER, None, None))
+            self.proc.join(timeout)
+
+            if self.proc.is_alive():
+                self.proc.terminate()

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/keyed.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/keyed.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/keyed.py
new file mode 100644
index 0000000..36328ed
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/keyed.py
@@ -0,0 +1,68 @@
+from __future__ import absolute_import
+
+import logging
+
+from kafka.partitioner import HashedPartitioner
+from .base import (
+    Producer, BATCH_SEND_DEFAULT_INTERVAL,
+    BATCH_SEND_MSG_COUNT
+)
+
+log = logging.getLogger("kafka")
+
+
+class KeyedProducer(Producer):
+    """
+    A producer which distributes messages to partitions based on the key
+
+    Arguments:
+        client: The kafka client instance
+
+    Keyword Arguments:
+        partitioner: A partitioner class that will be used to get the partition
+            to send the message to. Must be derived from Partitioner
+        async: If True, the messages are sent asynchronously via another
+            thread (process). We will not wait for a response to these
+        ack_timeout: Value (in milliseconds) indicating a timeout for waiting
+            for an acknowledgement
+        batch_send: If True, messages are send in batches
+        batch_send_every_n: If set, messages are send in batches of this size
+        batch_send_every_t: If set, messages are send after this timeout
+    """
+    def __init__(self, client, partitioner=None, async=False,
+                 req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
+                 ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
+                 codec=None,
+                 batch_send=False,
+                 batch_send_every_n=BATCH_SEND_MSG_COUNT,
+                 batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+        if not partitioner:
+            partitioner = HashedPartitioner
+        self.partitioner_class = partitioner
+        self.partitioners = {}
+
+        super(KeyedProducer, self).__init__(client, async, req_acks,
+                                            ack_timeout, codec, batch_send,
+                                            batch_send_every_n,
+                                            batch_send_every_t)
+
+    def _next_partition(self, topic, key):
+        if topic not in self.partitioners:
+            if not self.client.has_metadata_for_topic(topic):
+                self.client.load_metadata_for_topics(topic)
+
+            self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
+
+        partitioner = self.partitioners[topic]
+        return partitioner.partition(key)
+
+    def send_messages(self,topic,key,*msg):
+        partition = self._next_partition(topic, key)
+        return self._send_messages(topic, partition, *msg,key=key)
+
+    def send(self, topic, key, msg):
+        partition = self._next_partition(topic, key)
+        return self._send_messages(topic, partition, msg, key=key)
+
+    def __repr__(self):
+        return '<KeyedProducer batch=%s>' % self.async

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/simple.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/simple.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/simple.py
new file mode 100644
index 0000000..2699cf2
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/producer/simple.py
@@ -0,0 +1,81 @@
+from __future__ import absolute_import
+
+import logging
+import random
+import six
+
+from itertools import cycle
+
+from six.moves import xrange
+
+from .base import (
+    Producer, BATCH_SEND_DEFAULT_INTERVAL,
+    BATCH_SEND_MSG_COUNT
+)
+
+log = logging.getLogger("kafka")
+
+
+class SimpleProducer(Producer):
+    """
+    A simple, round-robin producer. Each message goes to exactly one partition
+
+    Arguments:
+        client: The Kafka client instance to use
+
+    Keyword Arguments:
+        async: If True, the messages are sent asynchronously via another
+            thread (process). We will not wait for a response to these
+        req_acks: A value indicating the acknowledgements that the server must
+            receive before responding to the request
+        ack_timeout: Value (in milliseconds) indicating a timeout for waiting
+            for an acknowledgement
+        batch_send: If True, messages are send in batches
+        batch_send_every_n: If set, messages are send in batches of this size
+        batch_send_every_t: If set, messages are send after this timeout
+        random_start: If true, randomize the initial partition which the
+            the first message block will be published to, otherwise
+            if false, the first message block will always publish
+            to partition 0 before cycling through each partition
+    """
+    def __init__(self, client, async=False,
+                 req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
+                 ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
+                 codec=None,
+                 batch_send=False,
+                 batch_send_every_n=BATCH_SEND_MSG_COUNT,
+                 batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+                 random_start=True):
+        self.partition_cycles = {}
+        self.random_start = random_start
+        super(SimpleProducer, self).__init__(client, async, req_acks,
+                                             ack_timeout, codec, batch_send,
+                                             batch_send_every_n,
+                                             batch_send_every_t)
+
+    def _next_partition(self, topic):
+        if topic not in self.partition_cycles:
+            if not self.client.has_metadata_for_topic(topic):
+                self.client.load_metadata_for_topics(topic)
+
+            self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
+
+            # Randomize the initial partition that is returned
+            if self.random_start:
+                num_partitions = len(self.client.get_partition_ids_for_topic(topic))
+                for _ in xrange(random.randint(0, num_partitions-1)):
+                    next(self.partition_cycles[topic])
+
+        return next(self.partition_cycles[topic])
+
+    def send_messages(self, topic, *msg):
+        if not isinstance(topic, six.binary_type):
+            topic = topic.encode('utf-8')
+
+        partition = self._next_partition(topic)
+        return super(SimpleProducer, self).send_messages(
+            topic, partition, *msg
+        )
+
+    def __repr__(self):
+        return '<SimpleProducer batch=%s>' % self.async

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/protocol.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/protocol.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/protocol.py
new file mode 100644
index 0000000..2a39de6
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/protocol.py
@@ -0,0 +1,604 @@
+import logging
+import struct
+
+import six
+
+from six.moves import xrange
+
+from kafka.codec import (
+    gzip_encode, gzip_decode, snappy_encode, snappy_decode
+)
+from kafka.common import (
+    Message, OffsetAndMessage, TopicAndPartition,
+    BrokerMetadata, TopicMetadata, PartitionMetadata,
+    MetadataResponse, ProduceResponse, FetchResponse,
+    OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
+    ProtocolError, BufferUnderflowError, ChecksumError,
+    ConsumerFetchSizeTooSmall, UnsupportedCodecError
+)
+from kafka.util import (
+    crc32, read_short_string, read_int_string, relative_unpack,
+    write_short_string, write_int_string, group_by_topic_and_partition
+)
+
+log = logging.getLogger("kafka")
+
+ATTRIBUTE_CODEC_MASK = 0x03
+CODEC_NONE = 0x00
+CODEC_GZIP = 0x01
+CODEC_SNAPPY = 0x02
+ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)
+
+
+class KafkaProtocol(object):
+    """
+    Class to encapsulate all of the protocol encoding/decoding.
+    This class does not have any state associated with it, it is purely
+    for organization.
+    """
+    PRODUCE_KEY = 0
+    FETCH_KEY = 1
+    OFFSET_KEY = 2
+    METADATA_KEY = 3
+    OFFSET_COMMIT_KEY = 8
+    OFFSET_FETCH_KEY = 9
+
+    ###################
+    #   Private API   #
+    ###################
+
+    @classmethod
+    def _encode_message_header(cls, client_id, correlation_id, request_key):
+        """
+        Encode the common request envelope
+        """
+        return struct.pack('>hhih%ds' % len(client_id),
+                           request_key,          # ApiKey
+                           0,                    # ApiVersion
+                           correlation_id,       # CorrelationId
+                           len(client_id),       # ClientId size
+                           client_id)            # ClientId
+
+    @classmethod
+    def _encode_message_set(cls, messages):
+        """
+        Encode a MessageSet. Unlike other arrays in the protocol,
+        MessageSets are not length-prefixed
+
+        Format
+        ======
+        MessageSet => [Offset MessageSize Message]
+          Offset => int64
+          MessageSize => int32
+        """
+        message_set = []
+        for message in messages:
+            encoded_message = KafkaProtocol._encode_message(message)
+            message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0,
+                                           len(encoded_message),
+                                           encoded_message))
+        return b''.join(message_set)
+
+    @classmethod
+    def _encode_message(cls, message):
+        """
+        Encode a single message.
+
+        The magic number of a message is a format version number.
+        The only supported magic number right now is zero
+
+        Format
+        ======
+        Message => Crc MagicByte Attributes Key Value
+          Crc => int32
+          MagicByte => int8
+          Attributes => int8
+          Key => bytes
+          Value => bytes
+        """
+        if message.magic == 0:
+            msg = b''.join([
+                struct.pack('>BB', message.magic, message.attributes),
+                write_int_string(message.key),
+                write_int_string(message.value)
+            ])
+            crc = crc32(msg)
+            msg = struct.pack('>I%ds' % len(msg), crc, msg)
+        else:
+            raise ProtocolError("Unexpected magic number: %d" % message.magic)
+        return msg
+
+    @classmethod
+    def _decode_message_set_iter(cls, data):
+        """
+        Iteratively decode a MessageSet
+
+        Reads repeated elements of (offset, message), calling decode_message
+        to decode a single message. Since compressed messages contain futher
+        MessageSets, these two methods have been decoupled so that they may
+        recurse easily.
+        """
+        cur = 0
+        read_message = False
+        while cur < len(data):
+            try:
+                ((offset, ), cur) = relative_unpack('>q', data, cur)
+                (msg, cur) = read_int_string(data, cur)
+                for (offset, message) in KafkaProtocol._decode_message(msg, offset):
+                    read_message = True
+                    yield OffsetAndMessage(offset, message)
+            except BufferUnderflowError:
+                # NOTE: Not sure this is correct error handling:
+                # Is it possible to get a BUE if the message set is somewhere
+                # in the middle of the fetch response? If so, we probably have
+                # an issue that's not fetch size too small.
+                # Aren't we ignoring errors if we fail to unpack data by
+                # raising StopIteration()?
+                # If _decode_message() raises a ChecksumError, couldn't that
+                # also be due to the fetch size being too small?
+                if read_message is False:
+                    # If we get a partial read of a message, but haven't
+                    # yielded anything there's a problem
+                    raise ConsumerFetchSizeTooSmall()
+                else:
+                    raise StopIteration()
+
+    @classmethod
+    def _decode_message(cls, data, offset):
+        """
+        Decode a single Message
+
+        The only caller of this method is decode_message_set_iter.
+        They are decoupled to support nested messages (compressed MessageSets).
+        The offset is actually read from decode_message_set_iter (it is part
+        of the MessageSet payload).
+        """
+        ((crc, magic, att), cur) = relative_unpack('>IBB', data, 0)
+        if crc != crc32(data[4:]):
+            raise ChecksumError("Message checksum failed")
+
+        (key, cur) = read_int_string(data, cur)
+        (value, cur) = read_int_string(data, cur)
+
+        codec = att & ATTRIBUTE_CODEC_MASK
+
+        if codec == CODEC_NONE:
+            yield (offset, Message(magic, att, key, value))
+
+        elif codec == CODEC_GZIP:
+            gz = gzip_decode(value)
+            for (offset, msg) in KafkaProtocol._decode_message_set_iter(gz):
+                yield (offset, msg)
+
+        elif codec == CODEC_SNAPPY:
+            snp = snappy_decode(value)
+            for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp):
+                yield (offset, msg)
+
+    ##################
+    #   Public API   #
+    ##################
+
+    @classmethod
+    def encode_produce_request(cls, client_id, correlation_id,
+                               payloads=None, acks=1, timeout=1000):
+        """
+        Encode some ProduceRequest structs
+
+        Arguments:
+            client_id: string
+            correlation_id: int
+            payloads: list of ProduceRequest
+            acks: How "acky" you want the request to be
+                0: immediate response
+                1: written to disk by the leader
+                2+: waits for this many number of replicas to sync
+                -1: waits for all replicas to be in sync
+            timeout: Maximum time the server will wait for acks from replicas.
+                This is _not_ a socket timeout
+
+        """
+        payloads = [] if payloads is None else payloads
+        grouped_payloads = group_by_topic_and_partition(payloads)
+
+        message = []
+        message.append(cls._encode_message_header(client_id, correlation_id,
+                                                  KafkaProtocol.PRODUCE_KEY))
+
+        message.append(struct.pack('>hii', acks, timeout,
+                                   len(grouped_payloads)))
+
+        for topic, topic_payloads in grouped_payloads.items():
+            message.append(struct.pack('>h%dsi' % len(topic), len(topic), topic,
+                                       len(topic_payloads)))
+
+            for partition, payload in topic_payloads.items():
+                msg_set = KafkaProtocol._encode_message_set(payload.messages)
+                message.append(struct.pack('>ii%ds' % len(msg_set), partition,
+                                           len(msg_set), msg_set))
+
+        msg = b''.join(message)
+        return struct.pack('>i%ds' % len(msg), len(msg), msg)
+
+    @classmethod
+    def decode_produce_response(cls, data):
+        """
+        Decode bytes to a ProduceResponse
+
+        Arguments:
+            data: bytes to decode
+
+        """
+        ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
+
+        for i in range(num_topics):
+            ((strlen,), cur) = relative_unpack('>h', data, cur)
+            topic = data[cur:cur + strlen]
+            cur += strlen
+            ((num_partitions,), cur) = relative_unpack('>i', data, cur)
+            for i in range(num_partitions):
+                ((partition, error, offset), cur) = relative_unpack('>ihq',
+                                                                    data, cur)
+
+                yield ProduceResponse(topic, partition, error, offset)
+
+    @classmethod
+    def encode_fetch_request(cls, client_id, correlation_id, payloads=None,
+                             max_wait_time=100, min_bytes=4096):
+        """
+        Encodes some FetchRequest structs
+
+        Arguments:
+            client_id: string
+            correlation_id: int
+            payloads: list of FetchRequest
+            max_wait_time: int, how long to block waiting on min_bytes of data
+            min_bytes: int, the minimum number of bytes to accumulate before
+                       returning the response
+        """
+
+        payloads = [] if payloads is None else payloads
+        grouped_payloads = group_by_topic_and_partition(payloads)
+
+        message = []
+        message.append(cls._encode_message_header(client_id, correlation_id,
+                                                  KafkaProtocol.FETCH_KEY))
+
+        # -1 is the replica id
+        message.append(struct.pack('>iiii', -1, max_wait_time, min_bytes,
+                                   len(grouped_payloads)))
+
+        for topic, topic_payloads in grouped_payloads.items():
+            message.append(write_short_string(topic))
+            message.append(struct.pack('>i', len(topic_payloads)))
+            for partition, payload in topic_payloads.items():
+                message.append(struct.pack('>iqi', partition, payload.offset,
+                                           payload.max_bytes))
+
+        msg = b''.join(message)
+        return struct.pack('>i%ds' % len(msg), len(msg), msg)
+
+    @classmethod
+    def decode_fetch_response(cls, data):
+        """
+        Decode bytes to a FetchResponse
+
+        Arguments:
+            data: bytes to decode
+        """
+        ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
+
+        for i in range(num_topics):
+            (topic, cur) = read_short_string(data, cur)
+            ((num_partitions,), cur) = relative_unpack('>i', data, cur)
+
+            for i in range(num_partitions):
+                ((partition, error, highwater_mark_offset), cur) = \
+                    relative_unpack('>ihq', data, cur)
+
+                (message_set, cur) = read_int_string(data, cur)
+
+                yield FetchResponse(
+                    topic, partition, error,
+                    highwater_mark_offset,
+                    KafkaProtocol._decode_message_set_iter(message_set))
+
+    @classmethod
+    def encode_offset_request(cls, client_id, correlation_id, payloads=None):
+        payloads = [] if payloads is None else payloads
+        grouped_payloads = group_by_topic_and_partition(payloads)
+
+        message = []
+        message.append(cls._encode_message_header(client_id, correlation_id,
+                                                  KafkaProtocol.OFFSET_KEY))
+
+        # -1 is the replica id
+        message.append(struct.pack('>ii', -1, len(grouped_payloads)))
+
+        for topic, topic_payloads in grouped_payloads.items():
+            message.append(write_short_string(topic))
+            message.append(struct.pack('>i', len(topic_payloads)))
+
+            for partition, payload in topic_payloads.items():
+                message.append(struct.pack('>iqi', partition, payload.time,
+                                           payload.max_offsets))
+
+        msg = b''.join(message)
+        return struct.pack('>i%ds' % len(msg), len(msg), msg)
+
+    @classmethod
+    def decode_offset_response(cls, data):
+        """
+        Decode bytes to an OffsetResponse
+
+        Arguments:
+            data: bytes to decode
+        """
+        ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
+
+        for i in range(num_topics):
+            (topic, cur) = read_short_string(data, cur)
+            ((num_partitions,), cur) = relative_unpack('>i', data, cur)
+
+            for i in range(num_partitions):
+                ((partition, error, num_offsets,), cur) = \
+                    relative_unpack('>ihi', data, cur)
+
+                offsets = []
+                for j in range(num_offsets):
+                    ((offset,), cur) = relative_unpack('>q', data, cur)
+                    offsets.append(offset)
+
+                yield OffsetResponse(topic, partition, error, tuple(offsets))
+
+    @classmethod
+    def encode_metadata_request(cls, client_id, correlation_id, topics=None,
+                                payloads=None):
+        """
+        Encode a MetadataRequest
+
+        Arguments:
+            client_id: string
+            correlation_id: int
+            topics: list of strings
+        """
+        if payloads is None:
+            topics = [] if topics is None else topics
+        else:
+            topics = payloads
+
+        message = []
+        message.append(cls._encode_message_header(client_id, correlation_id,
+                                                  KafkaProtocol.METADATA_KEY))
+
+        message.append(struct.pack('>i', len(topics)))
+
+        for topic in topics:
+            message.append(struct.pack('>h%ds' % len(topic), len(topic), topic))
+
+        msg = b''.join(message)
+        return write_int_string(msg)
+
+    @classmethod
+    def decode_metadata_response(cls, data):
+        """
+        Decode bytes to a MetadataResponse
+
+        Arguments:
+            data: bytes to decode
+        """
+        ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
+
+        # Broker info
+        brokers = []
+        for i in range(numbrokers):
+            ((nodeId, ), cur) = relative_unpack('>i', data, cur)
+            (host, cur) = read_short_string(data, cur)
+            ((port,), cur) = relative_unpack('>i', data, cur)
+            brokers.append(BrokerMetadata(nodeId, host, port))
+
+        # Topic info
+        ((num_topics,), cur) = relative_unpack('>i', data, cur)
+        topic_metadata = []
+
+        for i in range(num_topics):
+            ((topic_error,), cur) = relative_unpack('>h', data, cur)
+            (topic_name, cur) = read_short_string(data, cur)
+            ((num_partitions,), cur) = relative_unpack('>i', data, cur)
+            partition_metadata = []
+
+            for j in range(num_partitions):
+                ((partition_error_code, partition, leader, numReplicas), cur) = \
+                    relative_unpack('>hiii', data, cur)
+
+                (replicas, cur) = relative_unpack(
+                    '>%di' % numReplicas, data, cur)
+
+                ((num_isr,), cur) = relative_unpack('>i', data, cur)
+                (isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
+
+                partition_metadata.append(
+                    PartitionMetadata(topic_name, partition, leader,
+                                      replicas, isr, partition_error_code)
+                )
+
+            topic_metadata.append(
+                TopicMetadata(topic_name, topic_error, partition_metadata)
+            )
+
+        return MetadataResponse(brokers, topic_metadata)
+
+    @classmethod
+    def encode_offset_commit_request(cls, client_id, correlation_id,
+                                     group, payloads):
+        """
+        Encode some OffsetCommitRequest structs
+
+        Arguments:
+            client_id: string
+            correlation_id: int
+            group: string, the consumer group you are committing offsets for
+            payloads: list of OffsetCommitRequest
+        """
+        grouped_payloads = group_by_topic_and_partition(payloads)
+
+        message = []
+        message.append(cls._encode_message_header(client_id, correlation_id,
+                                                  KafkaProtocol.OFFSET_COMMIT_KEY))
+        message.append(write_short_string(group))
+        message.append(struct.pack('>i', len(grouped_payloads)))
+
+        for topic, topic_payloads in grouped_payloads.items():
+            message.append(write_short_string(topic))
+            message.append(struct.pack('>i', len(topic_payloads)))
+
+            for partition, payload in topic_payloads.items():
+                message.append(struct.pack('>iq', partition, payload.offset))
+                message.append(write_short_string(payload.metadata))
+
+        msg = b''.join(message)
+        return struct.pack('>i%ds' % len(msg), len(msg), msg)
+
+    @classmethod
+    def decode_offset_commit_response(cls, data):
+        """
+        Decode bytes to an OffsetCommitResponse
+
+        Arguments:
+            data: bytes to decode
+        """
+        ((correlation_id,), cur) = relative_unpack('>i', data, 0)
+        ((num_topics,), cur) = relative_unpack('>i', data, cur)
+
+        for i in xrange(num_topics):
+            (topic, cur) = read_short_string(data, cur)
+            ((num_partitions,), cur) = relative_unpack('>i', data, cur)
+
+            for i in xrange(num_partitions):
+                ((partition, error), cur) = relative_unpack('>ih', data, cur)
+                yield OffsetCommitResponse(topic, partition, error)
+
+    @classmethod
+    def encode_offset_fetch_request(cls, client_id, correlation_id,
+                                    group, payloads):
+        """
+        Encode some OffsetFetchRequest structs
+
+        Arguments:
+            client_id: string
+            correlation_id: int
+            group: string, the consumer group you are fetching offsets for
+            payloads: list of OffsetFetchRequest
+        """
+        grouped_payloads = group_by_topic_and_partition(payloads)
+
+        message = []
+        message.append(cls._encode_message_header(client_id, correlation_id,
+                                                  KafkaProtocol.OFFSET_FETCH_KEY))
+
+        message.append(write_short_string(group))
+        message.append(struct.pack('>i', len(grouped_payloads)))
+
+        for topic, topic_payloads in grouped_payloads.items():
+            message.append(write_short_string(topic))
+            message.append(struct.pack('>i', len(topic_payloads)))
+
+            for partition, payload in topic_payloads.items():
+                message.append(struct.pack('>i', partition))
+
+        msg = b''.join(message)
+        return struct.pack('>i%ds' % len(msg), len(msg), msg)
+
+    @classmethod
+    def decode_offset_fetch_response(cls, data):
+        """
+        Decode bytes to an OffsetFetchResponse
+
+        Arguments:
+            data: bytes to decode
+        """
+
+        ((correlation_id,), cur) = relative_unpack('>i', data, 0)
+        ((num_topics,), cur) = relative_unpack('>i', data, cur)
+
+        for i in range(num_topics):
+            (topic, cur) = read_short_string(data, cur)
+            ((num_partitions,), cur) = relative_unpack('>i', data, cur)
+
+            for i in range(num_partitions):
+                ((partition, offset), cur) = relative_unpack('>iq', data, cur)
+                (metadata, cur) = read_short_string(data, cur)
+                ((error,), cur) = relative_unpack('>h', data, cur)
+
+                yield OffsetFetchResponse(topic, partition, offset,
+                                          metadata, error)
+
+
+def create_message(payload, key=None):
+    """
+    Construct a Message
+
+    Arguments:
+        payload: bytes, the payload to send to Kafka
+        key: bytes, a key used for partition routing (optional)
+
+    """
+    return Message(0, 0, key, payload)
+
+
+def create_gzip_message(payloads, key=None):
+    """
+    Construct a Gzipped Message containing multiple Messages
+
+    The given payloads will be encoded, compressed, and sent as a single atomic
+    message to Kafka.
+
+    Arguments:
+        payloads: list(bytes), a list of payload to send be sent to Kafka
+        key: bytes, a key used for partition routing (optional)
+
+    """
+    message_set = KafkaProtocol._encode_message_set(
+        [create_message(payload, key) for payload in payloads])
+
+    gzipped = gzip_encode(message_set)
+    codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
+
+    return Message(0, 0x00 | codec, key, gzipped)
+
+
+def create_snappy_message(payloads, key=None):
+    """
+    Construct a Snappy Message containing multiple Messages
+
+    The given payloads will be encoded, compressed, and sent as a single atomic
+    message to Kafka.
+
+    Arguments:
+        payloads: list(bytes), a list of payload to send be sent to Kafka
+        key: bytes, a key used for partition routing (optional)
+
+    """
+    message_set = KafkaProtocol._encode_message_set(
+        [create_message(payload, key) for payload in payloads])
+
+    snapped = snappy_encode(message_set)
+    codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
+
+    return Message(0, 0x00 | codec, key, snapped)
+
+
+def create_message_set(messages, codec=CODEC_NONE, key=None):
+    """Create a message set using the given codec.
+
+    If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
+    return a list containing a single codec-encoded message.
+    """
+    if codec == CODEC_NONE:
+        return [create_message(m, key) for m in messages]
+    elif codec == CODEC_GZIP:
+        return [create_gzip_message(messages, key)]
+    elif codec == CODEC_SNAPPY:
+        return [create_snappy_message(messages, key)]
+    else:
+        raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/util.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/util.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/util.py
new file mode 100644
index 0000000..14d2b2c
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/kafka/util.py
@@ -0,0 +1,153 @@
+import binascii
+import collections
+import struct
+import sys
+from threading import Thread, Event
+
+import six
+
+from kafka.common import BufferUnderflowError
+
+
+def crc32(data):
+    return binascii.crc32(data) & 0xffffffff
+
+
+def write_int_string(s):
+    if s is not None and not isinstance(s, six.binary_type):
+        raise TypeError('Expected "%s" to be bytes\n'
+                        'data=%s' % (type(s), repr(s)))
+    if s is None:
+        return struct.pack('>i', -1)
+    else:
+        return struct.pack('>i%ds' % len(s), len(s), s)
+
+
+def write_short_string(s):
+    if s is not None and not isinstance(s, six.binary_type):
+        raise TypeError('Expected "%s" to be bytes\n'
+                        'data=%s' % (type(s), repr(s)))
+    if s is None:
+        return struct.pack('>h', -1)
+    elif len(s) > 32767 and sys.version_info < (2, 7):
+        # Python 2.6 issues a deprecation warning instead of a struct error
+        raise struct.error(len(s))
+    else:
+        return struct.pack('>h%ds' % len(s), len(s), s)
+
+
+def read_short_string(data, cur):
+    if len(data) < cur + 2:
+        raise BufferUnderflowError("Not enough data left")
+
+    (strlen,) = struct.unpack('>h', data[cur:cur + 2])
+    if strlen == -1:
+        return None, cur + 2
+
+    cur += 2
+    if len(data) < cur + strlen:
+        raise BufferUnderflowError("Not enough data left")
+
+    out = data[cur:cur + strlen]
+    return out, cur + strlen
+
+
+def read_int_string(data, cur):
+    if len(data) < cur + 4:
+        raise BufferUnderflowError(
+            "Not enough data left to read string len (%d < %d)" %
+            (len(data), cur + 4))
+
+    (strlen,) = struct.unpack('>i', data[cur:cur + 4])
+    if strlen == -1:
+        return None, cur + 4
+
+    cur += 4
+    if len(data) < cur + strlen:
+        raise BufferUnderflowError("Not enough data left")
+
+    out = data[cur:cur + strlen]
+    return out, cur + strlen
+
+
+def relative_unpack(fmt, data, cur):
+    size = struct.calcsize(fmt)
+    if len(data) < cur + size:
+        raise BufferUnderflowError("Not enough data left")
+
+    out = struct.unpack(fmt, data[cur:cur + size])
+    return out, cur + size
+
+
+def group_by_topic_and_partition(tuples):
+    out = collections.defaultdict(dict)
+    for t in tuples:
+        out[t.topic][t.partition] = t
+    return out
+
+
+def kafka_bytestring(s):
+    """
+    Takes a string or bytes instance
+    Returns bytes, encoding strings in utf-8 as necessary
+    """
+    if isinstance(s, six.binary_type):
+        return s
+    if isinstance(s, six.string_types):
+        return s.encode('utf-8')
+    raise TypeError(s)
+
+
+class ReentrantTimer(object):
+    """
+    A timer that can be restarted, unlike threading.Timer
+    (although this uses threading.Timer)
+
+    Arguments:
+
+        t: timer interval in milliseconds
+        fn: a callable to invoke
+        args: tuple of args to be passed to function
+        kwargs: keyword arguments to be passed to function
+    """
+    def __init__(self, t, fn, *args, **kwargs):
+
+        if t <= 0:
+            raise ValueError('Invalid timeout value')
+
+        if not callable(fn):
+            raise ValueError('fn must be callable')
+
+        self.thread = None
+        self.t = t / 1000.0
+        self.fn = fn
+        self.args = args
+        self.kwargs = kwargs
+        self.active = None
+
+    def _timer(self, active):
+        # python2.6 Event.wait() always returns None
+        # python2.7 and greater returns the flag value (true/false)
+        # we want the flag value, so add an 'or' here for python2.6
+        # this is redundant for later python versions (FLAG OR FLAG == FLAG)
+        while not (active.wait(self.t) or active.is_set()):
+            self.fn(*self.args, **self.kwargs)
+
+    def start(self):
+        if self.thread is not None:
+            self.stop()
+
+        self.active = Event()
+        self.thread = Thread(target=self._timer, args=(self.active,))
+        self.thread.daemon = True  # So the app exits when main thread exits
+        self.thread.start()
+
+    def stop(self):
+        if self.thread is None:
+            return
+
+        self.active.set()
+        self.thread.join(self.t + 1)
+        # noinspection PyAttributeOutsideInit
+        self.timer = None
+        self.fn = None

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/load_example.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/load_example.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/load_example.py
new file mode 100755
index 0000000..1f8b418
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/load_example.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+import threading, logging, time, collections
+
+from kafka.client import KafkaClient
+from kafka.consumer import SimpleConsumer
+from kafka.producer import SimpleProducer
+
+msg_size = 524288
+
+class Producer(threading.Thread):
+    daemon = True
+    big_msg = "1" * msg_size
+
+    def run(self):
+        client = KafkaClient("localhost:9092")
+        producer = SimpleProducer(client)
+        self.sent = 0
+
+        while True:
+            producer.send_messages('my-topic', self.big_msg)
+            self.sent += 1
+
+
+class Consumer(threading.Thread):
+    daemon = True
+
+    def run(self):
+        client = KafkaClient("localhost:9092")
+        consumer = SimpleConsumer(client, "test-group", "my-topic",
+            max_buffer_size = None,
+        )
+        self.valid = 0
+        self.invalid = 0
+
+        for message in consumer:
+            if len(message.message.value) == msg_size:
+                self.valid += 1
+            else:
+                self.invalid += 1
+
+def main():
+    threads = [
+        Producer(),
+        Consumer()
+    ]
+
+    for t in threads:
+        t.start()
+
+    time.sleep(10)
+    print 'Messages sent: %d' % threads[0].sent
+    print 'Messages recvd: %d' % threads[1].valid
+    print 'Messages invalid: %d' % threads[1].invalid
+
+if __name__ == "__main__":
+    logging.basicConfig(
+        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+        level=logging.DEBUG
+        )
+    main()

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/kafka.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/kafka.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/kafka.properties
new file mode 100644
index 0000000..c9fd552
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/kafka.properties
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+############################# Server Basics #############################
+
+broker.id={broker_id}
+
+############################# Socket Server Settings #############################
+
+port={port}
+host.name={host}
+
+num.network.threads=2
+num.io.threads=2
+
+socket.send.buffer.bytes=1048576
+socket.receive.buffer.bytes=1048576
+socket.request.max.bytes=104857600
+
+############################# Log Basics #############################
+
+log.dirs={tmp_dir}/data
+num.partitions={partitions}
+default.replication.factor={replicas}
+
+############################# Log Flush Policy #############################
+
+log.flush.interval.messages=10000
+log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+log.retention.hours=168
+log.segment.bytes=536870912
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+zookeeper.connection.timeout.ms=1000000
+
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
+kafka.csv.metrics.dir={tmp_dir}
+kafka.csv.metrics.reporter.enabled=false
+
+log.cleanup.policy=delete

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/log4j.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/log4j.properties
new file mode 100644
index 0000000..f863b3b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.kafka=DEBUG, stdout
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
+log4j.logger.org.apache.zookeeper=INFO, stdout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/zookeeper.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/zookeeper.properties
new file mode 100644
index 0000000..68e1ef9
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.0/resources/zookeeper.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+dataDir={tmp_dir}
+clientPortAddress={host}
+clientPort={port}
+maxClientCnxns=0


Mime
View raw message