eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [4/7] incubator-eagle git commit: EAGLE-120 EAGLE-100 initial system and hadoop metric initial system and hadoop metric https://issues.apache.org/jira/browse/EAGLE-120 Author: qingwen220 qingwzhao@ebay.com Reviewer: yonzhang2012 yonzhang2012@apache.org C
Date Wed, 13 Jan 2016 01:08:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/kafka.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/kafka.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/kafka.properties
new file mode 100644
index 0000000..a638f39
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/kafka.properties
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id={broker_id}
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port={port}
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name={host}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=2
+ 
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs={tmp_dir}/data
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions={partitions}
+default.replication.factor={replicas}
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk. 
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.retention.check.interval.ms=60000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/log4j.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/log4j.properties
new file mode 100644
index 0000000..f863b3b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.kafka=DEBUG, stdout
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
+log4j.logger.org.apache.zookeeper=INFO, stdout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/zookeeper.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/zookeeper.properties
new file mode 100644
index 0000000..e3fd097
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/zookeeper.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir={tmp_dir}
+# the port at which the clients will connect
+clientPort={port}
+clientPortAddress={host}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/kafka.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/kafka.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/kafka.properties
new file mode 100644
index 0000000..5d47520
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/kafka.properties
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+############################# Server Basics #############################
+
+broker.id={broker_id}
+
+############################# Socket Server Settings #############################
+
+port={port}
+host.name={host}
+
+num.network.threads=2
+num.io.threads=2
+
+socket.send.buffer.bytes=1048576
+socket.receive.buffer.bytes=1048576
+socket.request.max.bytes=104857600
+
+############################# Log Basics #############################
+
+log.dirs={tmp_dir}/data
+num.partitions={partitions}
+default.replication.factor={replicas}
+
+############################# Log Flush Policy #############################
+
+log.flush.interval.messages=10000
+log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+log.retention.hours=168
+log.segment.bytes=536870912
+log.retention.check.interval.ms=60000
+log.cleanup.interval.mins=1
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+zookeeper.connection.timeout.ms=1000000

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/log4j.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/log4j.properties
new file mode 100644
index 0000000..f863b3b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.kafka=DEBUG, stdout
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
+log4j.logger.org.apache.zookeeper=INFO, stdout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/zookeeper.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/zookeeper.properties
new file mode 100644
index 0000000..68e1ef9
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/zookeeper.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+dataDir={tmp_dir}
+clientPortAddress={host}
+clientPort={port}
+maxClientCnxns=0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/kafka.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/kafka.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/kafka.properties
new file mode 100644
index 0000000..a638f39
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/kafka.properties
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id={broker_id}
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port={port}
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name={host}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=2
+ 
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs={tmp_dir}/data
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions={partitions}
+default.replication.factor={replicas}
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk. 
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.retention.check.interval.ms=60000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/log4j.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/log4j.properties
new file mode 100644
index 0000000..f863b3b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.kafka=DEBUG, stdout
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
+log4j.logger.org.apache.zookeeper=INFO, stdout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/zookeeper.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/zookeeper.properties
new file mode 100644
index 0000000..e3fd097
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/zookeeper.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir={tmp_dir}
+# the port at which the clients will connect
+clientPort={port}
+clientPortAddress={host}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/kafka.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/kafka.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/kafka.properties
new file mode 100644
index 0000000..a638f39
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/kafka.properties
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id={broker_id}
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port={port}
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name={host}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=2
+ 
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs={tmp_dir}/data
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions={partitions}
+default.replication.factor={replicas}
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk. 
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.retention.check.interval.ms=60000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/log4j.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/log4j.properties
new file mode 100644
index 0000000..f863b3b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.kafka=DEBUG, stdout
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
+log4j.logger.org.apache.zookeeper=INFO, stdout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/zookeeper.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/zookeeper.properties
new file mode 100644
index 0000000..e3fd097
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/zookeeper.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir={tmp_dir}
+# the port at which the clients will connect
+clientPort={port}
+clientPortAddress={host}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/setup.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/setup.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/setup.py
new file mode 100644
index 0000000..f1c1954
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/setup.py
@@ -0,0 +1,70 @@
+import sys
+import os
+from setuptools import setup, Command
+
+with open('VERSION', 'r') as v:
+    __version__ = v.read().rstrip()
+
+
+class Tox(Command):
+
+    user_options = []
+
+    def initialize_options(self):
+        pass
+
+    def finalize_options(self):
+        pass
+
+    @classmethod
+    def run(cls):
+        import tox
+        sys.exit(tox.cmdline([]))
+
+
+test_require = ['tox', 'mock']
+if sys.version_info < (2, 7):
+    test_require.append('unittest2')
+
+here = os.path.abspath(os.path.dirname(__file__))
+
+with open(os.path.join(here, 'README.rst')) as f:
+    README = f.read()
+
+setup(
+    name="kafka-python",
+    version=__version__,
+
+    tests_require=test_require,
+    cmdclass={"test": Tox},
+
+    packages=[
+        "kafka",
+        "kafka.consumer",
+        "kafka.partitioner",
+        "kafka.producer",
+    ],
+
+    author="David Arthur",
+    author_email="mumrah@gmail.com",
+    url="https://github.com/mumrah/kafka-python",
+    license="Apache License 2.0",
+    description="Pure Python client for Apache Kafka",
+    long_description=README,
+    keywords="apache kafka",
+    install_requires=['six'],
+    classifiers=[
+        "Development Status :: 4 - Beta",
+        "Intended Audience :: Developers",
+        "License :: OSI Approved :: Apache Software License",
+        "Programming Language :: Python",
+        "Programming Language :: Python :: 2",
+        "Programming Language :: Python :: 2.6",
+        "Programming Language :: Python :: 2.7",
+        "Programming Language :: Python :: 3",
+        "Programming Language :: Python :: 3.3",
+        "Programming Language :: Python :: 3.4",
+        "Programming Language :: Python :: Implementation :: PyPy",
+        "Topic :: Software Development :: Libraries :: Python Modules",
+    ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/__init__.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/__init__.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/__init__.py
new file mode 100644
index 0000000..c4d1e80
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/__init__.py
@@ -0,0 +1,6 @@
+import sys
+
+if sys.version_info < (2, 7):
+    import unittest2 as unittest
+else:
+    import unittest

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/fixtures.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/fixtures.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/fixtures.py
new file mode 100644
index 0000000..3c496fd
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/fixtures.py
@@ -0,0 +1,236 @@
+import logging
+import os
+import os.path
+import shutil
+import subprocess
+import tempfile
+from six.moves import urllib
+import uuid
+
+from six.moves.urllib.parse import urlparse  # pylint: disable-msg=E0611
+from test.service import ExternalService, SpawnedService
+from test.testutil import get_open_port
+
+class Fixture(object):
+    kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0')
+    scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
+    project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+    kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin"))
+    ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache"))
+
+    @classmethod
+    def download_official_distribution(cls,
+                                       kafka_version=None,
+                                       scala_version=None,
+                                       output_dir=None):
+        if not kafka_version:
+            kafka_version = cls.kafka_version
+        if not scala_version:
+            scala_version = cls.scala_version
+        if not output_dir:
+            output_dir = os.path.join(cls.project_root, 'servers', 'dist')
+
+        distfile = 'kafka_%s-%s' % (scala_version, kafka_version,)
+        url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,)
+        output_file = os.path.join(output_dir, distfile + '.tgz')
+
+        if os.path.isfile(output_file):
+            logging.info("Found file already on disk: %s", output_file)
+            return output_file
+
+        # New tarballs are .tgz, older ones are sometimes .tar.gz
+        try:
+            url = url_base + distfile + '.tgz'
+            logging.info("Attempting to download %s", url)
+            response = urllib.request.urlopen(url)
+        except urllib.error.HTTPError:
+            logging.exception("HTTP Error")
+            url = url_base + distfile + '.tar.gz'
+            logging.info("Attempting to download %s", url)
+            response = urllib.request.urlopen(url)
+
+        logging.info("Saving distribution file to %s", output_file)
+        with open(output_file, 'w') as output_file_fd:
+            output_file_fd.write(response.read())
+
+        return output_file
+
+    @classmethod
+    def test_resource(cls, filename):
+        return os.path.join(cls.project_root, "servers", cls.kafka_version, "resources", filename)
+
+    @classmethod
+    def kafka_run_class_args(cls, *args):
+        result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')]
+        result.extend(args)
+        return result
+
+    @classmethod
+    def kafka_run_class_env(cls):
+        env = os.environ.copy()
+        env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % cls.test_resource("log4j.properties")
+        return env
+
+    @classmethod
+    def render_template(cls, source_file, target_file, binding):
+        with open(source_file, "r") as handle:
+            template = handle.read()
+        with open(target_file, "w") as handle:
+            handle.write(template.format(**binding))
+
+
+class ZookeeperFixture(Fixture):
+    @classmethod
+    def instance(cls):
+        if "ZOOKEEPER_URI" in os.environ:
+            parse = urlparse(os.environ["ZOOKEEPER_URI"])
+            (host, port) = (parse.hostname, parse.port)
+            fixture = ExternalService(host, port)
+        else:
+            (host, port) = ("127.0.0.1", get_open_port())
+            fixture = cls(host, port)
+
+        fixture.open()
+        return fixture
+
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+
+        self.tmp_dir = None
+        self.child = None
+
+    def out(self, message):
+        logging.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
+
+    def open(self):
+        self.tmp_dir = tempfile.mkdtemp()
+        self.out("Running local instance...")
+        logging.info("  host    = %s", self.host)
+        logging.info("  port    = %s", self.port)
+        logging.info("  tmp_dir = %s", self.tmp_dir)
+
+        # Generate configs
+        template = self.test_resource("zookeeper.properties")
+        properties = os.path.join(self.tmp_dir, "zookeeper.properties")
+        self.render_template(template, properties, vars(self))
+
+        # Configure Zookeeper child process
+        args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
+        env = self.kafka_run_class_env()
+        self.child = SpawnedService(args, env)
+
+        # Party!
+        self.out("Starting...")
+        self.child.start()
+        self.child.wait_for(r"binding to port")
+        self.out("Done!")
+
+    def close(self):
+        self.out("Stopping...")
+        self.child.stop()
+        self.child = None
+        self.out("Done!")
+        shutil.rmtree(self.tmp_dir)
+
+
+class KafkaFixture(Fixture):
+    @classmethod
+    def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
+        if zk_chroot is None:
+            zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
+        if "KAFKA_URI" in os.environ:
+            parse = urlparse(os.environ["KAFKA_URI"])
+            (host, port) = (parse.hostname, parse.port)
+            fixture = ExternalService(host, port)
+        else:
+            (host, port) = ("127.0.0.1", get_open_port())
+            fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
+            fixture.open()
+        return fixture
+
+    def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
+        self.host = host
+        self.port = port
+
+        self.broker_id = broker_id
+
+        self.zk_host = zk_host
+        self.zk_port = zk_port
+        self.zk_chroot = zk_chroot
+
+        self.replicas = replicas
+        self.partitions = partitions
+
+        self.tmp_dir = None
+        self.child = None
+        self.running = False
+
+    def out(self, message):
+        logging.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
+
+    def open(self):
+        if self.running:
+            self.out("Instance already running")
+            return
+
+        self.tmp_dir = tempfile.mkdtemp()
+        self.out("Running local instance...")
+        logging.info("  host       = %s", self.host)
+        logging.info("  port       = %s", self.port)
+        logging.info("  broker_id  = %s", self.broker_id)
+        logging.info("  zk_host    = %s", self.zk_host)
+        logging.info("  zk_port    = %s", self.zk_port)
+        logging.info("  zk_chroot  = %s", self.zk_chroot)
+        logging.info("  replicas   = %s", self.replicas)
+        logging.info("  partitions = %s", self.partitions)
+        logging.info("  tmp_dir    = %s", self.tmp_dir)
+
+        # Create directories
+        os.mkdir(os.path.join(self.tmp_dir, "logs"))
+        os.mkdir(os.path.join(self.tmp_dir, "data"))
+
+        # Generate configs
+        template = self.test_resource("kafka.properties")
+        properties = os.path.join(self.tmp_dir, "kafka.properties")
+        self.render_template(template, properties, vars(self))
+
+        # Configure Kafka child process
+        args = self.kafka_run_class_args("kafka.Kafka", properties)
+        env = self.kafka_run_class_env()
+        self.child = SpawnedService(args, env)
+
+        # Party!
+        self.out("Creating Zookeeper chroot node...")
+        args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
+                                         "-server", "%s:%d" % (self.zk_host, self.zk_port),
+                                         "create",
+                                         "/%s" % self.zk_chroot,
+                                         "kafka-python")
+        env = self.kafka_run_class_env()
+        proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+        if proc.wait() != 0:
+            self.out("Failed to create Zookeeper chroot node")
+            self.out(proc.stdout.read())
+            self.out(proc.stderr.read())
+            raise RuntimeError("Failed to create Zookeeper chroot node")
+        self.out("Done!")
+
+        self.out("Starting...")
+        self.child.start()
+        self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id)
+        self.out("Done!")
+        self.running = True
+
+    def close(self):
+        if not self.running:
+            self.out("Instance already stopped")
+            return
+
+        self.out("Stopping...")
+        self.child.stop()
+        self.child = None
+        self.out("Done!")
+        shutil.rmtree(self.tmp_dir)
+        self.running = False

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/service.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/service.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/service.py
new file mode 100644
index 0000000..dcd3e68
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/service.py
@@ -0,0 +1,111 @@
+import logging
+import re
+import select
+import subprocess
+import threading
+import time
+
+__all__ = [
+    'ExternalService',
+    'SpawnedService',
+
+]
+
+class ExternalService(object):
+    def __init__(self, host, port):
+        logging.info("Using already running service at %s:%d", host, port)
+        self.host = host
+        self.port = port
+
+    def open(self):
+        pass
+
+    def close(self):
+        pass
+
+
+class SpawnedService(threading.Thread):
+    def __init__(self, args=None, env=None):
+        threading.Thread.__init__(self)
+
+        if args is None:
+            raise TypeError("args parameter is required")
+        self.args = args
+        self.env = env
+        self.captured_stdout = []
+        self.captured_stderr = []
+
+        self.should_die = threading.Event()
+
+    def run(self):
+        self.run_with_handles()
+
+    def run_with_handles(self):
+        self.child = subprocess.Popen(
+            self.args,
+            env=self.env,
+            bufsize=1,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE)
+        alive = True
+
+        while True:
+            (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1)
+
+            if self.child.stdout in rds:
+                line = self.child.stdout.readline()
+                self.captured_stdout.append(line.decode('utf-8'))
+
+            if self.child.stderr in rds:
+                line = self.child.stderr.readline()
+                self.captured_stderr.append(line.decode('utf-8'))
+
+            if self.should_die.is_set():
+                self.child.terminate()
+                alive = False
+
+            poll_results = self.child.poll()
+            if poll_results is not None:
+                if not alive:
+                    break
+                else:
+                    self.dump_logs()
+                    raise RuntimeError("Subprocess has died. Aborting. (args=%s)" % ' '.join(str(x) for x in self.args))
+
+    def dump_logs(self):
+        logging.critical('stderr')
+        for line in self.captured_stderr:
+            logging.critical(line.rstrip())
+
+        logging.critical('stdout')
+        for line in self.captured_stdout:
+            logging.critical(line.rstrip())
+
+    def wait_for(self, pattern, timeout=30):
+        t1 = time.time()
+        while True:
+            t2 = time.time()
+            if t2 - t1 >= timeout:
+                try:
+                    self.child.kill()
+                except:
+                    logging.exception("Received exception when killing child process")
+                self.dump_logs()
+
+                raise RuntimeError("Waiting for %r timed out after %d seconds" % (pattern, timeout))
+
+            if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None:
+                logging.info("Found pattern %r in %d seconds via stdout", pattern, (t2 - t1))
+                return
+            if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None:
+                logging.info("Found pattern %r in %d seconds via stderr", pattern, (t2 - t1))
+                return
+            time.sleep(0.1)
+
+    def start(self):
+        threading.Thread.start(self)
+
+    def stop(self):
+        self.should_die.set()
+        self.join()
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client.py
new file mode 100644
index 0000000..c522d9a
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client.py
@@ -0,0 +1,403 @@
+import socket
+from time import sleep
+
+from mock import ANY, MagicMock, patch
+import six
+from . import unittest
+
+from kafka import KafkaClient
+from kafka.common import (
+    ProduceRequest, MetadataResponse,
+    BrokerMetadata, TopicMetadata, PartitionMetadata,
+    TopicAndPartition, KafkaUnavailableError,
+    LeaderNotAvailableError, UnknownTopicOrPartitionError,
+    KafkaTimeoutError, ConnectionError
+)
+from kafka.conn import KafkaConnection
+from kafka.protocol import KafkaProtocol, create_message
+
+from test.testutil import Timer
+
+NO_ERROR = 0
+UNKNOWN_TOPIC_OR_PARTITION = 3
+NO_LEADER = 5
+
+class TestKafkaClient(unittest.TestCase):
+    def test_init_with_list(self):
+        with patch.object(KafkaClient, 'load_metadata_for_topics'):
+            client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
+
+        self.assertEqual(
+            sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
+            sorted(client.hosts))
+
+    def test_init_with_csv(self):
+        with patch.object(KafkaClient, 'load_metadata_for_topics'):
+            client = KafkaClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092')
+
+        self.assertEqual(
+            sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
+            sorted(client.hosts))
+
+    def test_init_with_unicode_csv(self):
+        with patch.object(KafkaClient, 'load_metadata_for_topics'):
+            client = KafkaClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092')
+
+        self.assertEqual(
+            sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
+            sorted(client.hosts))
+
+    def test_send_broker_unaware_request_fail(self):
+        'Tests that call fails when all hosts are unavailable'
+
+        mocked_conns = {
+            ('kafka01', 9092): MagicMock(),
+            ('kafka02', 9092): MagicMock()
+        }
+
+        # inject KafkaConnection side effects
+        mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
+        mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")
+
+        def mock_get_conn(host, port):
+            return mocked_conns[(host, port)]
+
+        # patch to avoid making requests before we want it
+        with patch.object(KafkaClient, 'load_metadata_for_topics'):
+            with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
+                client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
+
+                req = KafkaProtocol.encode_metadata_request(b'client', 0)
+                with self.assertRaises(KafkaUnavailableError):
+                    client._send_broker_unaware_request(payloads=['fake request'],
+                                                        encoder_fn=MagicMock(return_value='fake encoded message'),
+                                                        decoder_fn=lambda x: x)
+
+                for key, conn in six.iteritems(mocked_conns):
+                    conn.send.assert_called_with(ANY, 'fake encoded message')
+
+    def test_send_broker_unaware_request(self):
+        'Tests that call works when at least one of the host is available'
+
+        mocked_conns = {
+            ('kafka01', 9092): MagicMock(),
+            ('kafka02', 9092): MagicMock(),
+            ('kafka03', 9092): MagicMock()
+        }
+        # inject KafkaConnection side effects
+        mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
+        mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
+        mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")
+
+        def mock_get_conn(host, port):
+            return mocked_conns[(host, port)]
+
+        # patch to avoid making requests before we want it
+        with patch.object(KafkaClient, 'load_metadata_for_topics'):
+            with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
+                with patch.object(KafkaClient, '_next_id', return_value=1):
+                    client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
+
+                    resp = client._send_broker_unaware_request(payloads=['fake request'],
+                                                               encoder_fn=MagicMock(),
+                                                               decoder_fn=lambda x: x)
+
+                    self.assertEqual('valid response', resp)
+                    mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
+
+    @patch('kafka.client.KafkaConnection')
+    @patch('kafka.client.KafkaProtocol')
+    def test_load_metadata(self, protocol, conn):
+
+        conn.recv.return_value = 'response'  # anything but None
+
+        brokers = [
+            BrokerMetadata(0, 'broker_1', 4567),
+            BrokerMetadata(1, 'broker_2', 5678)
+        ]
+
+        topics = [
+            TopicMetadata('topic_1', NO_ERROR, [
+                PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR)
+            ]),
+            TopicMetadata('topic_noleader', NO_ERROR, [
+                PartitionMetadata('topic_noleader', 0, -1, [], [],
+                                  NO_LEADER),
+                PartitionMetadata('topic_noleader', 1, -1, [], [],
+                                  NO_LEADER),
+            ]),
+            TopicMetadata('topic_no_partitions', NO_LEADER, []),
+            TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
+            TopicMetadata('topic_3', NO_ERROR, [
+                PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR),
+                PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR),
+                PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR)
+            ])
+        ]
+        protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+        # client loads metadata at init
+        client = KafkaClient(hosts=['broker_1:4567'])
+        self.assertDictEqual({
+            TopicAndPartition('topic_1', 0): brokers[1],
+            TopicAndPartition('topic_noleader', 0): None,
+            TopicAndPartition('topic_noleader', 1): None,
+            TopicAndPartition('topic_3', 0): brokers[0],
+            TopicAndPartition('topic_3', 1): brokers[1],
+            TopicAndPartition('topic_3', 2): brokers[0]},
+            client.topics_to_brokers)
+
+        # if we ask for metadata explicitly, it should raise errors
+        with self.assertRaises(LeaderNotAvailableError):
+            client.load_metadata_for_topics('topic_no_partitions')
+
+        with self.assertRaises(UnknownTopicOrPartitionError):
+            client.load_metadata_for_topics('topic_unknown')
+
+        # This should not raise
+        client.load_metadata_for_topics('topic_no_leader')
+
+    @patch('kafka.client.KafkaConnection')
+    @patch('kafka.client.KafkaProtocol')
+    def test_has_metadata_for_topic(self, protocol, conn):
+
+        conn.recv.return_value = 'response'  # anything but None
+
+        brokers = [
+            BrokerMetadata(0, 'broker_1', 4567),
+            BrokerMetadata(1, 'broker_2', 5678)
+        ]
+
+        topics = [
+            TopicMetadata('topic_still_creating', NO_LEADER, []),
+            TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+            TopicMetadata('topic_noleaders', NO_ERROR, [
+                PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
+                PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
+            ]),
+        ]
+        protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+        client = KafkaClient(hosts=['broker_1:4567'])
+
+        # Topics with no partitions return False
+        self.assertFalse(client.has_metadata_for_topic('topic_still_creating'))
+        self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist'))
+
+        # Topic with partition metadata, but no leaders return True
+        self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))
+
+    @patch('kafka.client.KafkaConnection')
+    @patch('kafka.client.KafkaProtocol')
+    def test_ensure_topic_exists(self, protocol, conn):
+
+        conn.recv.return_value = 'response'  # anything but None
+
+        brokers = [
+            BrokerMetadata(0, 'broker_1', 4567),
+            BrokerMetadata(1, 'broker_2', 5678)
+        ]
+
+        topics = [
+            TopicMetadata('topic_still_creating', NO_LEADER, []),
+            TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+            TopicMetadata('topic_noleaders', NO_ERROR, [
+                PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
+                PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
+            ]),
+        ]
+        protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+        client = KafkaClient(hosts=['broker_1:4567'])
+
+        with self.assertRaises(UnknownTopicOrPartitionError):
+            client.ensure_topic_exists('topic_doesnt_exist', timeout=1)
+
+        with self.assertRaises(KafkaTimeoutError):
+            client.ensure_topic_exists('topic_still_creating', timeout=1)
+
+        # This should not raise
+        client.ensure_topic_exists('topic_noleaders', timeout=1)
+
+    @patch('kafka.client.KafkaConnection')
+    @patch('kafka.client.KafkaProtocol')
+    def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
+        "Get leader for partitions reload metadata if it is not available"
+
+        conn.recv.return_value = 'response'  # anything but None
+
+        brokers = [
+            BrokerMetadata(0, 'broker_1', 4567),
+            BrokerMetadata(1, 'broker_2', 5678)
+        ]
+
+        topics = [
+            TopicMetadata('topic_no_partitions', NO_LEADER, [])
+        ]
+        protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+        client = KafkaClient(hosts=['broker_1:4567'])
+
+        # topic metadata is loaded but empty
+        self.assertDictEqual({}, client.topics_to_brokers)
+
+        topics = [
+            TopicMetadata('topic_one_partition', NO_ERROR, [
+                PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], NO_ERROR)
+            ])
+        ]
+        protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+        # calling _get_leader_for_partition (from any broker aware request)
+        # will try loading metadata again for the same topic
+        leader = client._get_leader_for_partition('topic_one_partition', 0)
+
+        self.assertEqual(brokers[0], leader)
+        self.assertDictEqual({
+            TopicAndPartition('topic_one_partition', 0): brokers[0]},
+            client.topics_to_brokers)
+
+    @patch('kafka.client.KafkaConnection')
+    @patch('kafka.client.KafkaProtocol')
+    def test_get_leader_for_unassigned_partitions(self, protocol, conn):
+
+        conn.recv.return_value = 'response'  # anything but None
+
+        brokers = [
+            BrokerMetadata(0, 'broker_1', 4567),
+            BrokerMetadata(1, 'broker_2', 5678)
+        ]
+
+        topics = [
+            TopicMetadata('topic_no_partitions', NO_LEADER, []),
+            TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
+        ]
+        protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+        client = KafkaClient(hosts=['broker_1:4567'])
+
+        self.assertDictEqual({}, client.topics_to_brokers)
+
+        with self.assertRaises(LeaderNotAvailableError):
+            client._get_leader_for_partition('topic_no_partitions', 0)
+
+        with self.assertRaises(UnknownTopicOrPartitionError):
+            client._get_leader_for_partition('topic_unknown', 0)
+
+    @patch('kafka.client.KafkaConnection')
+    @patch('kafka.client.KafkaProtocol')
+    def test_get_leader_exceptions_when_noleader(self, protocol, conn):
+
+        conn.recv.return_value = 'response'  # anything but None
+
+        brokers = [
+            BrokerMetadata(0, 'broker_1', 4567),
+            BrokerMetadata(1, 'broker_2', 5678)
+        ]
+
+        topics = [
+            TopicMetadata('topic_noleader', NO_ERROR, [
+                PartitionMetadata('topic_noleader', 0, -1, [], [],
+                                  NO_LEADER),
+                PartitionMetadata('topic_noleader', 1, -1, [], [],
+                                  NO_LEADER),
+            ]),
+        ]
+        protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+        client = KafkaClient(hosts=['broker_1:4567'])
+        self.assertDictEqual(
+            {
+                TopicAndPartition('topic_noleader', 0): None,
+                TopicAndPartition('topic_noleader', 1): None
+            },
+            client.topics_to_brokers)
+
+        # No leader partitions -- raise LeaderNotAvailableError
+        with self.assertRaises(LeaderNotAvailableError):
+            self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
+        with self.assertRaises(LeaderNotAvailableError):
+            self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
+
+        # Unknown partitions -- raise UnknownTopicOrPartitionError
+        with self.assertRaises(UnknownTopicOrPartitionError):
+            self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2))
+
+        topics = [
+            TopicMetadata('topic_noleader', NO_ERROR, [
+                PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], NO_ERROR),
+                PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], NO_ERROR)
+            ]),
+        ]
+        protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+        self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
+        self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
+
+    @patch('kafka.client.KafkaConnection')
+    @patch('kafka.client.KafkaProtocol')
+    def test_send_produce_request_raises_when_noleader(self, protocol, conn):
+        "Send producer request raises LeaderNotAvailableError if leader is not available"
+
+        conn.recv.return_value = 'response'  # anything but None
+
+        brokers = [
+            BrokerMetadata(0, 'broker_1', 4567),
+            BrokerMetadata(1, 'broker_2', 5678)
+        ]
+
+        topics = [
+            TopicMetadata('topic_noleader', NO_ERROR, [
+                PartitionMetadata('topic_noleader', 0, -1, [], [],
+                                  NO_LEADER),
+                PartitionMetadata('topic_noleader', 1, -1, [], [],
+                                  NO_LEADER),
+            ]),
+        ]
+        protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+        client = KafkaClient(hosts=['broker_1:4567'])
+
+        requests = [ProduceRequest(
+            "topic_noleader", 0,
+            [create_message("a"), create_message("b")])]
+
+        with self.assertRaises(LeaderNotAvailableError):
+            client.send_produce_request(requests)
+
+    @patch('kafka.client.KafkaConnection')
+    @patch('kafka.client.KafkaProtocol')
+    def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn):
+
+        conn.recv.return_value = 'response'  # anything but None
+
+        brokers = [
+            BrokerMetadata(0, 'broker_1', 4567),
+            BrokerMetadata(1, 'broker_2', 5678)
+        ]
+
+        topics = [
+            TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+        ]
+        protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+        client = KafkaClient(hosts=['broker_1:4567'])
+
+        requests = [ProduceRequest(
+            "topic_doesnt_exist", 0,
+            [create_message("a"), create_message("b")])]
+
+        with self.assertRaises(UnknownTopicOrPartitionError):
+            client.send_produce_request(requests)
+
+    def test_timeout(self):
+        def _timeout(*args, **kwargs):
+            timeout = args[1]
+            sleep(timeout)
+            raise socket.timeout
+
+        with patch.object(socket, "create_connection", side_effect=_timeout):
+
+            with Timer() as t:
+                with self.assertRaises(ConnectionError):
+                    KafkaConnection("nowhere", 1234, 1.0)
+            self.assertGreaterEqual(t.interval, 1.0)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client_integration.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client_integration.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client_integration.py
new file mode 100644
index 0000000..c0331ea
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client_integration.py
@@ -0,0 +1,67 @@
+import os
+
+from kafka.common import (
+    FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
+    KafkaTimeoutError
+)
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import (
+    KafkaIntegrationTestCase, kafka_versions
+)
+
+class TestKafkaClientIntegration(KafkaIntegrationTestCase):
+    @classmethod
+    def setUpClass(cls):  # noqa
+        if not os.environ.get('KAFKA_VERSION'):
+            return
+
+        cls.zk = ZookeeperFixture.instance()
+        cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
+
+    @classmethod
+    def tearDownClass(cls):  # noqa
+        if not os.environ.get('KAFKA_VERSION'):
+            return
+
+        cls.server.close()
+        cls.zk.close()
+
+    @kafka_versions("all")
+    def test_consume_none(self):
+        fetch = FetchRequest(self.topic, 0, 0, 1024)
+
+        fetch_resp, = self.client.send_fetch_request([fetch])
+        self.assertEqual(fetch_resp.error, 0)
+        self.assertEqual(fetch_resp.topic, self.topic)
+        self.assertEqual(fetch_resp.partition, 0)
+
+        messages = list(fetch_resp.messages)
+        self.assertEqual(len(messages), 0)
+
+    @kafka_versions("all")
+    def test_ensure_topic_exists(self):
+
+        # assume that self.topic was created by setUp
+        # if so, this should succeed
+        self.client.ensure_topic_exists(self.topic, timeout=1)
+
+        # ensure_topic_exists should fail with KafkaTimeoutError
+        with self.assertRaises(KafkaTimeoutError):
+            self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
+
+    ####################
+    #   Offset Tests   #
+    ####################
+
+    @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+    def test_commit_fetch_offsets(self):
+        req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
+        (resp,) = self.client.send_offset_commit_request(b"group", [req])
+        self.assertEqual(resp.error, 0)
+
+        req = OffsetFetchRequest(self.topic, 0)
+        (resp,) = self.client.send_offset_fetch_request(b"group", [req])
+        self.assertEqual(resp.error, 0)
+        self.assertEqual(resp.offset, 42)
+        self.assertEqual(resp.metadata, b"")  # Metadata isn't stored for now

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_codec.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_codec.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_codec.py
new file mode 100644
index 0000000..2d7670a
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_codec.py
@@ -0,0 +1,72 @@
+import struct
+
+from six.moves import xrange
+from . import unittest
+
+from kafka.codec import (
+    has_snappy, gzip_encode, gzip_decode,
+    snappy_encode, snappy_decode
+)
+
+from test.testutil import random_string
+
+class TestCodec(unittest.TestCase):
+    def test_gzip(self):
+        for i in xrange(1000):
+            s1 = random_string(100)
+            s2 = gzip_decode(gzip_encode(s1))
+            self.assertEqual(s1, s2)
+
+    @unittest.skipUnless(has_snappy(), "Snappy not available")
+    def test_snappy(self):
+        for i in xrange(1000):
+            s1 = random_string(100)
+            s2 = snappy_decode(snappy_encode(s1))
+            self.assertEqual(s1, s2)
+
+    @unittest.skipUnless(has_snappy(), "Snappy not available")
+    def test_snappy_detect_xerial(self):
+        import kafka as kafka1
+        _detect_xerial_stream = kafka1.codec._detect_xerial_stream
+
+        header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
+        false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
+        random_snappy = snappy_encode(b'SNAPPY' * 50)
+        short_data = b'\x01\x02\x03\x04'
+
+        self.assertTrue(_detect_xerial_stream(header))
+        self.assertFalse(_detect_xerial_stream(b''))
+        self.assertFalse(_detect_xerial_stream(b'\x00'))
+        self.assertFalse(_detect_xerial_stream(false_header))
+        self.assertFalse(_detect_xerial_stream(random_snappy))
+        self.assertFalse(_detect_xerial_stream(short_data))
+
+    @unittest.skipUnless(has_snappy(), "Snappy not available")
+    def test_snappy_decode_xerial(self):
+        header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
+        random_snappy = snappy_encode(b'SNAPPY' * 50)
+        block_len = len(random_snappy)
+        random_snappy2 = snappy_encode(b'XERIAL' * 50)
+        block_len2 = len(random_snappy2)
+
+        to_test = header \
+            + struct.pack('!i', block_len) + random_snappy \
+            + struct.pack('!i', block_len2) + random_snappy2 \
+
+        self.assertEqual(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
+
+    @unittest.skipUnless(has_snappy(), "Snappy not available")
+    def test_snappy_encode_xerial(self):
+        to_ensure = (
+            b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
+            b'\x00\x00\x00\x18'
+            b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
+            b'\x00\x00\x00\x18'
+            b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
+        )
+
+        to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
+
+        compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
+        self.assertEqual(compressed, to_ensure)
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_conn.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_conn.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_conn.py
new file mode 100644
index 0000000..2c8f3b2
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_conn.py
@@ -0,0 +1,164 @@
+import socket
+import struct
+
+import mock
+from . import unittest
+
+from kafka.common import ConnectionError
+from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS
+
+class ConnTest(unittest.TestCase):
+    def setUp(self):
+        self.config = {
+            'host': 'localhost',
+            'port': 9090,
+            'request_id': 0,
+            'payload': b'test data',
+            'payload2': b'another packet'
+        }
+
+        # Mocking socket.create_connection will cause _sock to always be a
+        # MagicMock()
+        patcher = mock.patch('socket.create_connection', spec=True)
+        self.MockCreateConn = patcher.start()
+        self.addCleanup(patcher.stop)
+
+        # Also mock socket.sendall() to appear successful
+        self.MockCreateConn().sendall.return_value = None
+
+        # And mock socket.recv() to return two payloads, then '', then raise
+        # Note that this currently ignores the num_bytes parameter to sock.recv()
+        payload_size = len(self.config['payload'])
+        payload2_size = len(self.config['payload2'])
+        self.MockCreateConn().recv.side_effect = [
+            struct.pack('>i', payload_size),
+            struct.pack('>%ds' % payload_size, self.config['payload']),
+            struct.pack('>i', payload2_size),
+            struct.pack('>%ds' % payload2_size, self.config['payload2']),
+            b''
+        ]
+
+        # Create a connection object
+        self.conn = KafkaConnection(self.config['host'], self.config['port'])
+
+        # Reset any mock counts caused by __init__
+        self.MockCreateConn.reset_mock()
+
+    def test_collect_hosts__happy_path(self):
+        hosts = "localhost:1234,localhost"
+        results = collect_hosts(hosts)
+
+        self.assertEqual(set(results), set([
+            ('localhost', 1234),
+            ('localhost', 9092),
+        ]))
+
+    def test_collect_hosts__string_list(self):
+        hosts = [
+            'localhost:1234',
+            'localhost',
+        ]
+
+        results = collect_hosts(hosts)
+
+        self.assertEqual(set(results), set([
+            ('localhost', 1234),
+            ('localhost', 9092),
+        ]))
+
+    def test_collect_hosts__with_spaces(self):
+        hosts = "localhost:1234, localhost"
+        results = collect_hosts(hosts)
+
+        self.assertEqual(set(results), set([
+            ('localhost', 1234),
+            ('localhost', 9092),
+        ]))
+
+    def test_send(self):
+        self.conn.send(self.config['request_id'], self.config['payload'])
+        self.conn._sock.sendall.assert_called_with(self.config['payload'])
+
+    def test_init_creates_socket_connection(self):
+        KafkaConnection(self.config['host'], self.config['port'])
+        self.MockCreateConn.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS)
+
+    def test_init_failure_raises_connection_error(self):
+
+        def raise_error(*args):
+            raise socket.error
+
+        assert socket.create_connection is self.MockCreateConn
+        socket.create_connection.side_effect=raise_error
+        with self.assertRaises(ConnectionError):
+            KafkaConnection(self.config['host'], self.config['port'])
+
+    def test_send__reconnects_on_dirty_conn(self):
+
+        # Dirty the connection
+        try:
+            self.conn._raise_connection_error()
+        except ConnectionError:
+            pass
+
+        # Now test that sending attempts to reconnect
+        self.assertEqual(self.MockCreateConn.call_count, 0)
+        self.conn.send(self.config['request_id'], self.config['payload'])
+        self.assertEqual(self.MockCreateConn.call_count, 1)
+
+    def test_send__failure_sets_dirty_connection(self):
+
+        def raise_error(*args):
+            raise socket.error
+
+        assert isinstance(self.conn._sock, mock.Mock)
+        self.conn._sock.sendall.side_effect=raise_error
+        try:
+            self.conn.send(self.config['request_id'], self.config['payload'])
+        except ConnectionError:
+            self.assertIsNone(self.conn._sock)
+
+    def test_recv(self):
+
+        self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
+
+    def test_recv__reconnects_on_dirty_conn(self):
+
+        # Dirty the connection
+        try:
+            self.conn._raise_connection_error()
+        except ConnectionError:
+            pass
+
+        # Now test that recv'ing attempts to reconnect
+        self.assertEqual(self.MockCreateConn.call_count, 0)
+        self.conn.recv(self.config['request_id'])
+        self.assertEqual(self.MockCreateConn.call_count, 1)
+
+    def test_recv__failure_sets_dirty_connection(self):
+
+        def raise_error(*args):
+            raise socket.error
+
+        # test that recv'ing attempts to reconnect
+        assert isinstance(self.conn._sock, mock.Mock)
+        self.conn._sock.recv.side_effect=raise_error
+        try:
+            self.conn.recv(self.config['request_id'])
+        except ConnectionError:
+            self.assertIsNone(self.conn._sock)
+
+    def test_recv__doesnt_consume_extra_data_in_stream(self):
+
+        # Here just test that each call to recv will return a single payload
+        self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
+        self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
+
+    def test_close__object_is_reusable(self):
+
+        # test that sending to a closed connection
+        # will re-connect and send data to the socket
+        self.conn.close()
+        self.conn.send(self.config['request_id'], self.config['payload'])
+        self.assertEqual(self.MockCreateConn.call_count, 1)
+        self.conn._sock.sendall.assert_called_with(self.config['payload'])

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer.py
new file mode 100644
index 0000000..7b8f370
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer.py
@@ -0,0 +1,15 @@
+
+from mock import MagicMock
+from . import unittest
+
+from kafka import SimpleConsumer, KafkaConsumer
+from kafka.common import KafkaConfigurationError
+
+class TestKafkaConsumer(unittest.TestCase):
+    def test_non_integer_partitions(self):
+        with self.assertRaises(AssertionError):
+            SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
+
+    def test_broker_list_required(self):
+        with self.assertRaises(KafkaConfigurationError):
+            KafkaConsumer()


Mime
View raw message