kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1396687 [1/4] - in /incubator/kafka/branches/0.8/system_test: ./ mirror_maker_testsuite/ mirror_maker_testsuite/config/ mirror_maker_testsuite/testcase_5001/ replication_testsuite/ replication_testsuite/config/ replication_testsuite/testca...
Date Wed, 10 Oct 2012 16:57:01 GMT
Author: nehanarkhede
Date: Wed Oct 10 16:56:57 2012
New Revision: 1396687

URL: http://svn.apache.org/viewvc?rev=1396687&view=rev
Log:
KAFKA-502 Adding 30 more system tests, reviewed by Jun and Neha; patched by John Fung

Added:
    incubator/kafka/branches/0.8/system_test/logging.conf
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/__init__.py
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/cluster_config.json
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/console_consumer.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/consumer.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/log4j.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/mirror_consumer.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/mirror_producer.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/producer.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/producer_performance.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/server.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/zookeeper.properties
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/mirror_maker_test.py
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/testcase_5001/
    incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0002/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0003/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0003/testcase_0003_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0004/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0004/testcase_0004_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0005/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0005/testcase_0005_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0006/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0006/testcase_0006_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0007/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0007/testcase_0007_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0008/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0008/testcase_0008_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0009/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0009/testcase_0009_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0010/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0010/testcase_0010_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0021/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0021/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0021/testcase_0021_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0022/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0022/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0022/testcase_0022_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0023/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0023/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0023/testcase_0023_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0101/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0101/testcase_0101_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0102/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0102/testcase_0102_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0103/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0103/testcase_0103_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0104/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0104/testcase_0104_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0105/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0105/testcase_0105_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0106/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0106/testcase_0106_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0107/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0107/testcase_0107_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0108/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0108/testcase_0108_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0109/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0109/testcase_0109_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0110/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0110/testcase_0110_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0111/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0111/testcase_0111_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0112/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0112/testcase_0112_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0113/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0113/testcase_0113_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0114/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0114/testcase_0114_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0115/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0115/testcase_0115_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0116/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0116/testcase_0116_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0117/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0117/testcase_0117_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0118/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0118/testcase_0118_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0121/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0121/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0121/testcase_0121_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0122/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0122/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0122/testcase_0122_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0123/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0123/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0123/testcase_0123_properties.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/cluster_config.json
    incubator/kafka/branches/0.8/system_test/run_sanity.sh
    incubator/kafka/branches/0.8/system_test/testcase_to_run.json
    incubator/kafka/branches/0.8/system_test/testcase_to_run_sanity.json
    incubator/kafka/branches/0.8/system_test/testcase_to_skip.json
    incubator/kafka/branches/0.8/system_test/utils/replication_utils.py
Modified:
    incubator/kafka/branches/0.8/system_test/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
    incubator/kafka/branches/0.8/system_test/system_test_env.py
    incubator/kafka/branches/0.8/system_test/system_test_runner.py
    incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
    incubator/kafka/branches/0.8/system_test/utils/metrics.py
    incubator/kafka/branches/0.8/system_test/utils/setup_utils.py
    incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
    incubator/kafka/branches/0.8/system_test/utils/testcase_env.py

Modified: incubator/kafka/branches/0.8/system_test/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/cluster_config.json?rev=1396687&r1=1396686&r2=1396687&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/cluster_config.json (original)
+++ incubator/kafka/branches/0.8/system_test/cluster_config.json Wed Oct 10 16:56:57 2012
@@ -4,6 +4,7 @@
             "entity_id": "0",
             "hostname": "localhost",
             "role": "zookeeper",
+            "cluster_name": "source",
             "kafka_home": "default",
             "java_home": "default",
             "jmx_port": "9990"
@@ -12,6 +13,7 @@
             "entity_id": "1",
             "hostname": "localhost",
             "role": "broker",
+            "cluster_name": "source",
             "kafka_home": "default",
             "java_home": "default",
             "jmx_port": "9991"
@@ -20,6 +22,7 @@
             "entity_id": "2",
             "hostname": "localhost",
             "role": "broker",
+            "cluster_name": "source",
             "kafka_home": "default",
             "java_home": "default",
             "jmx_port": "9992"
@@ -28,6 +31,7 @@
             "entity_id": "3",
             "hostname": "localhost",
             "role": "broker",
+            "cluster_name": "source",
             "kafka_home": "default",
             "java_home": "default",
             "jmx_port": "9993"
@@ -36,17 +40,19 @@
             "entity_id": "4",
             "hostname": "localhost",
             "role": "producer_performance",
+            "cluster_name": "source",
             "kafka_home": "default",
             "java_home": "default",
-            "jmx_port": "9994"
+            "jmx_port": "9997"
         },
         {
             "entity_id": "5",
             "hostname": "localhost",
             "role": "console_consumer",
+            "cluster_name": "source",
             "kafka_home": "default",
             "java_home": "default",
-            "jmx_port": "9995"
+            "jmx_port": "9998"
         }
     ]
 }

Added: incubator/kafka/branches/0.8/system_test/logging.conf
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/logging.conf?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/logging.conf (added)
+++ incubator/kafka/branches/0.8/system_test/logging.conf Wed Oct 10 16:56:57 2012
@@ -0,0 +1,56 @@
+# ==============================================
+# declaration - must have a 'root' logger
+# ==============================================
+[loggers]
+keys=root,namedLogger,anonymousLogger
+
+[handlers]
+keys=namedConsoleHandler,anonymousConsoleHandler
+
+[formatters]
+keys=namedFormatter,anonymousFormatter
+
+# ==============================================
+# loggers session
+# ==============================================
+[logger_root]
+level=NOTSET
+handlers=
+
+[logger_namedLogger]
+level=DEBUG
+handlers=namedConsoleHandler
+qualname=namedLogger
+propagate=0
+
+[logger_anonymousLogger]
+level=DEBUG
+handlers=anonymousConsoleHandler
+qualname=anonymousLogger
+propagate=0
+
+# ==============================================
+# handlers session
+# ** Change 'level' to INFO/DEBUG in this session
+# ==============================================
+[handler_namedConsoleHandler]
+class=StreamHandler
+level=INFO
+formatter=namedFormatter
+args=[]
+
+[handler_anonymousConsoleHandler]
+class=StreamHandler
+level=INFO
+formatter=anonymousFormatter
+args=[]
+
+# ==============================================
+# formatters session
+# ==============================================
+[formatter_namedFormatter]
+format=%(asctime)s - %(levelname)s - %(message)s %(name_of_class)s
+
+[formatter_anonymousFormatter]
+format=%(asctime)s - %(levelname)s - %(message)s
+

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/__init__.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/__init__.py?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/__init__.py (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/__init__.py Wed Oct 10 16:56:57 2012
@@ -0,0 +1 @@
+ 

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/cluster_config.json?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/cluster_config.json (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/cluster_config.json Wed Oct 10 16:56:57 2012
@@ -0,0 +1,126 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9100"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9101"
+        },
+
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9102"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9103"
+        },
+
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9104"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9105"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9106"
+        },
+
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9107"
+        },
+        {
+            "entity_id": "8",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9108"
+        },
+        {
+            "entity_id": "9",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9109"
+        },
+
+        {
+            "entity_id": "10",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9110"
+        },
+        {
+            "entity_id": "11",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9111"
+        },
+
+        {
+            "entity_id": "12",
+            "hostname": "localhost",
+            "role": "mirror_maker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9112"
+        }
+    ]
+}

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/console_consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/console_consumer.properties?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/console_consumer.properties (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/console_consumer.properties Wed Oct 10 16:56:57 2012
@@ -0,0 +1,4 @@
+zookeeper=local:2181
+topic=test_1
+from-beginning
+consumer-timeout-ms=10000

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/consumer.properties?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/consumer.properties (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/consumer.properties Wed Oct 10 16:56:57 2012
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=127.0.0.1:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=test-consumer-group
+
+#consumer timeout
+#consumer.timeout.ms=5000

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/log4j.properties?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/log4j.properties (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/log4j.properties Wed Oct 10 16:56:57 2012
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+#log4j.appender.fileAppender=org.apache.log4j.FileAppender
+#log4j.appender.fileAppender.File=kafka-request.log
+#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
+
+
+# Turn on all our debugging info
+#log4j.logger.kafka=INFO
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+
+# to print message checksum from ProducerPerformance
+log4j.logger.kafka.perf=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+
+# to print message checksum from ProducerPerformance
+log4j.logger.kafka.perf=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/mirror_consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/mirror_consumer.properties?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/mirror_consumer.properties (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/mirror_consumer.properties Wed Oct 10 16:56:57 2012
@@ -0,0 +1,12 @@
+zk.connect=localhost:2108
+zk.connectiontimeout.ms=1000000
+groupid=mm_regtest_grp
+autocommit.interval.ms=120000
+autooffset.reset=smallest
+#fetch.size=1048576
+#rebalance.retries.max=4
+#rebalance.backoff.ms=2000
+socket.buffersize=1048576
+fetch.size=1048576
+zk.synctime.ms=15000
+shallowiterator.enable=true

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/mirror_producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/mirror_producer.properties?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/mirror_producer.properties (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/mirror_producer.properties Wed Oct 10 16:56:57 2012
@@ -0,0 +1,5 @@
+producer.type=async
+queue.enqueueTimeout.ms=-1
+broker.list=localhost:9094
+compression.codec=0
+

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/producer.properties?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/producer.properties (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/producer.properties Wed Oct 10 16:56:57 2012
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.producer.ProducerConfig for more details
+
+############################# Producer Basics #############################
+
+# need to set either broker.list or zk.connect
+
+# configure brokers statically
+# format: brokerid1:host1:port1,brokerid2:host2:port2 ...
+#broker.list=0:localhost:9092
+
+# discover brokers from ZK
+zk.connect=localhost:2181
+
+# zookeeper session timeout; default is 6000
+#zk.sessiontimeout.ms=
+
+# the max time that the client waits to establish a connection to zookeeper; default is 6000
+#zk.connectiontimeout.ms
+
+# name of the partitioner class for partitioning events; default partition spreads data randomly
+#partitioner.class=
+
+# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
+producer.type=sync
+
+# specify the compression codec for all data generated: 0: no compression, 1: gzip
+compression.codec=0
+
+# message encoder
+serializer.class=kafka.serializer.StringEncoder
+
+# allow topic level compression
+#compressed.topics=
+
+# max message size; messages larger than that size are discarded; default is 1000000
+#max.message.size=
+
+
+############################# Async Producer #############################
+# maximum time, in milliseconds, for buffering data on the producer queue 
+#queue.time=
+
+# the maximum size of the blocking queue for buffering on the producer 
+#queue.size=
+
+# Timeout for event enqueue:
+# 0: events will be enqueued immediately or dropped if the queue is full
+# -ve: enqueue will block indefinitely if the queue is full
+# +ve: enqueue will block up to this many milliseconds if the queue is full
+#queue.enqueueTimeout.ms=
+
+# the number of messages batched at the producer 
+#batch.size=
+
+# the callback handler for one or multiple events 
+#callback.handler=
+
+# properties required to initialize the callback handler 
+#callback.handler.props=
+
+# the handler for events 
+#event.handler=
+
+# properties required to initialize the event handler 
+#event.handler.props=
+

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/producer_performance.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/producer_performance.properties?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/producer_performance.properties (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/producer_performance.properties Wed Oct 10 16:56:57 2012
@@ -0,0 +1,5 @@
+topic=mytest
+message-size=100
+thread=5
+compression-codec=0
+request-num-acks=-1

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/server.properties?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/server.properties (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/server.properties Wed Oct 10 16:56:57 2012
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+brokerid=0
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+#hostname=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9091
+
+# The number of threads handling network requests
+network.threads=2
+ 
+# The number of threads doing disk I/O
+io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+max.socket.request.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_logs
+
+# The number of logical partitions per topic per server. More partitions allow greater parallelism
+# for consumption, but also mean more files.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.default.flush.interval.ms=1000
+
+# Per-topic overrides for log.default.flush.interval.ms
+#topic.flush.intervals.ms=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.default.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.size.
+#log.retention.size=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.file.size=536870912
+#log.file.size=102400
+log.file.size=128
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zk.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+monitoring.period.secs=1

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/zookeeper.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/zookeeper.properties?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/zookeeper.properties (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/config/zookeeper.properties Wed Oct 10 16:56:57 2012
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+syncLimit=5
+initLimit=10
+tickTime=2000

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/mirror_maker_test.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/mirror_maker_test.py?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/mirror_maker_test.py (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/mirror_maker_test.py Wed Oct 10 16:56:57 2012
@@ -0,0 +1,325 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#!/usr/bin/env python
+
+# ===================================
+# mirror_maker_test.py
+# ===================================
+
+import inspect
+import logging
+import os
+import signal
+import subprocess
+import sys
+import time
+import traceback
+
+from   system_test_env    import SystemTestEnv
+sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
+
+from   setup_utils        import SetupUtils
+from   replication_utils  import ReplicationUtils
+import system_test_utils
+from   testcase_env       import TestcaseEnv
+
+# product specific: Kafka
+import kafka_system_test_utils
+import metrics
+
+class MirrorMakerTest(ReplicationUtils, SetupUtils):
+
+    testModuleAbsPathName = os.path.realpath(__file__)
+    testSuiteAbsPathName  = os.path.abspath(os.path.dirname(testModuleAbsPathName))
+
+    def __init__(self, systemTestEnv):
+
+        # SystemTestEnv - provides cluster level environment settings
+        #     such as entity_id, hostname, kafka_home, java_home which
+        #     are available in a list of dictionary named 
+        #     "clusterEntityConfigDictList"
+        self.systemTestEnv = systemTestEnv
+
+        super(MirrorMakerTest, self).__init__(self)
+
+        # dict to pass user-defined attributes to logger argument: "extra"
+        d = {'name_of_class': self.__class__.__name__}
+
+    def signal_handler(self, signal, frame):
+        self.log_message("Interrupt detected - User pressed Ctrl+c")
+
+        # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific
+        self.log_message("stopping all entities - please wait ...")
+        kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
+        sys.exit(1) 
+
+    def runTest(self):
+
+        # ======================================================================
+        # get all testcase directories under this testsuite
+        # ======================================================================
+        testCasePathNameList = system_test_utils.get_dir_paths_with_prefix(
+            self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
+        testCasePathNameList.sort()
+
+        # =============================================================
+        # launch each testcase one by one: testcase_1, testcase_2, ...
+        # =============================================================
+        for testCasePathName in testCasePathNameList:
+   
+            skipThisTestCase = False
+
+            try: 
+                # ======================================================================
+                # A new instance of TestcaseEnv to keep track of this testcase's env vars
+                # and initialize some env vars as testCasePathName is available now
+                # ======================================================================
+                self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self)
+                self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
+                self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName)
+                self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"]
+
+                # ======================================================================
+                # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json
+                # ======================================================================
+                testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"]
+
+                if self.systemTestEnv.printTestDescriptionsOnly:
+                    self.testcaseEnv.printTestCaseDescription(testcaseDirName)
+                    continue
+                elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName):
+                    self.log_message("Skipping : " + testcaseDirName)
+                    skipThisTestCase = True
+                    continue
+                else:
+                    self.testcaseEnv.printTestCaseDescription(testcaseDirName)
+                    system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName)
+
+                # ============================================================================== #
+                # ============================================================================== #
+                #                   Product Specific Testing Code Starts Here:                   #
+                # ============================================================================== #
+                # ============================================================================== #
+    
+                # initialize self.testcaseEnv with user-defined environment variables (product specific)
+                self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
+                self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]    = False
+                self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False
+
+                # initialize signal handler
+                signal.signal(signal.SIGINT, self.signal_handler)
+
+                
+                # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase
+                # for collecting logs from remote machines
+                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
+    
+                # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file:
+                #   system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
+                self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(
+                    self.testcaseEnv.testcasePropJsonPathName)
+    
+                # TestcaseEnv - initialize producer & consumer config / log file pathnames
+                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
+
+                # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
+                kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+
+                # generate remote hosts log/config dirs if not exist
+                kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+    
+                # generate properties files for zookeeper, kafka, producer, consumer and mirror-maker:
+                # 1. copy system_test/<suite_name>_testsuite/config/*.properties to 
+                #    system_test/<suite_name>_testsuite/testcase_<n>/config/
+                # 2. update all properties files in system_test/<suite_name>_testsuite/testcase_<n>/config
+                #    by overriding the settings specified in:
+                #    system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
+                kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName,
+                    self.testcaseEnv, self.systemTestEnv)
+               
+                # =============================================
+                # preparing all entities to start the test
+                # =============================================
+                self.log_message("starting zookeepers")
+                kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 2s")
+                time.sleep(2)
+
+                self.log_message("starting brokers")
+                kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+
+                
+                self.log_message("starting mirror makers")
+                kafka_system_test_utils.start_mirror_makers(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+
+                #print "#### sleeping for 30 min"
+                #time.sleep(1800)
+
+                self.log_message("creating topics")
+                kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+
+                
+                # =============================================
+                # starting producer 
+                # =============================================
+                self.log_message("starting producer in the background")
+                kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False)
+                msgProducingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_producing_free_time_sec"]
+                self.anonLogger.info("sleeping for " + msgProducingFreeTimeSec + " sec to produce some messages")
+                time.sleep(int(msgProducingFreeTimeSec))
+
+                # =============================================
+                # A while-loop to bounce leader as specified
+                # by "num_iterations" in testcase_n_properties.json
+                # =============================================
+                i = 1
+                numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"])
+                while i <= numIterations:
+
+                    self.log_message("Iteration " + str(i) + " of " + str(numIterations))
+
+                    self.log_message("looking up leader")
+                    leaderDict = kafka_system_test_utils.get_leader_elected_log_line(
+                        self.systemTestEnv, self.testcaseEnv, self.leaderAttributesDict)
+        
+                    # ==========================
+                    # leaderDict looks like this:
+                    # ==========================
+                    #{'entity_id': u'3',
+                    # 'partition': '0',
+                    # 'timestamp': 1345050255.8280001,
+                    # 'hostname': u'localhost',
+                    # 'topic': 'test_1',
+                    # 'brokerid': '3'}
+
+                    # =============================================
+                    # validate to see if leader election is successful
+                    # =============================================
+                    self.log_message("validating leader election")
+                    result = kafka_system_test_utils.validate_leader_election_successful( 
+                         self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
+        
+                    # =============================================
+                    # trigger leader re-election by stopping leader
+                    # to get re-election latency
+                    # =============================================
+                    bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"]
+                    self.log_message("bounce_leader flag : " + bounceLeaderFlag)
+                    if (bounceLeaderFlag.lower() == "true"):
+                        reelectionLatency = kafka_system_test_utils.get_reelection_latency(
+                            self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict)
+                        latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"]
+                        self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
+       
+                    # =============================================
+                    # starting previously terminated broker 
+                    # =============================================
+                    if bounceLeaderFlag.lower() == "true":
+                        self.log_message("starting the previously terminated broker")
+                        stoppedLeaderEntityId  = leaderDict["entity_id"]
+                        kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId)
+
+                    self.anonLogger.info("sleeping for 15s")
+                    time.sleep(15)
+                    i += 1
+                # while loop
+
+                # =============================================
+                # tell producer to stop
+                # =============================================
+                self.testcaseEnv.lock.acquire()
+                self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
+                time.sleep(1)
+                self.testcaseEnv.lock.release()
+                time.sleep(1)
+
+                # =============================================
+                # wait for producer thread's update of
+                # "backgroundProducerStopped" to be "True"
+                # =============================================
+                while 1:
+                    self.testcaseEnv.lock.acquire()
+                    self.logger.info("status of backgroundProducerStopped : [" + \
+                        str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d)
+                    if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
+                        time.sleep(1)
+                        self.logger.info("all producer threads completed", extra=self.d)
+                        break
+                    time.sleep(1)
+                    self.testcaseEnv.lock.release()
+                    time.sleep(2)
+
+                # =============================================
+                # starting consumer
+                # =============================================
+                self.log_message("starting consumer in the background")
+                kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 10s")
+                time.sleep(10)
+                    
+                # =============================================
+                # this testcase is completed - stop all entities
+                # =============================================
+                self.log_message("stopping all entities")
+                for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+                for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+                # make sure all entities are stopped
+                kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)
+
+                # =============================================
+                # collect logs from remote hosts
+                # =============================================
+                kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+    
+                # =============================================
+                # validate the data matched and checksum
+                # =============================================
+                self.log_message("validating data matched")
+                kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
+                kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+
+                # =============================================
+                # draw graphs
+                # =============================================
+                metrics.draw_all_graphs(self.systemTestEnv.METRICS_PATHNAME, 
+                                        self.testcaseEnv, 
+                                        self.systemTestEnv.clusterEntityConfigDictList)
+                
+                # build dashboard, one for each role
+                metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME,
+                                             self.testcaseEnv.testCaseDashboardsDir,
+                                             self.systemTestEnv.clusterEntityConfigDictList)
+                
+            except Exception as e:
+                self.log_message("Exception while running test {0}".format(e))
+                traceback.print_exc()
+
+            finally:
+                if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly:
+                    self.log_message("stopping all entities - please wait ...")
+                    kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
+

Added: incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json (added)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json Wed Oct 10 16:56:57 2012
@@ -0,0 +1,135 @@
+{
+  "description": {"01":"To Test : 'Replication with Mirror Maker'",
+                  "02":"Set up 2 clusters such as : SOURCE => MirrorMaker => TARGET",
+                  "03":"Set up 2-node Zk cluster for both SOURCE & TARGET",
+                  "04":"Produce and consume messages to a single topic - single partition.",
+                  "05":"This test sends messages to 3 replicas",
+                  "06":"At the end it verifies the log size and contents",
+                  "07":"Use a consumer to verify no message loss in TARGET cluster.",
+                  "08":"Producer dimensions : mode:sync, acks:-1, comp:0",
+                  "09":"Log segment size    : 10240"
+  },
+  "testcase_args": {
+    "bounce_leader": "false",
+    "replica_factor": "3",
+    "num_partition": "1",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "clientPort": "2118",
+      "dataDir": "/tmp/zookeeper_1",
+      "log_filename": "zookeeper_1.log",
+      "config_filename": "zookeeper_1.properties"
+    },
+
+    {
+      "entity_id": "2",
+      "clientPort": "2128",
+      "dataDir": "/tmp/zookeeper_2",
+      "log_filename": "zookeeper_2.log",
+      "config_filename": "zookeeper_2.properties"
+    },
+    {
+      "entity_id": "3",
+      "clientPort": "2138",
+      "dataDir": "/tmp/zookeeper_3",
+      "log_filename": "zookeeper_3.log",
+      "config_filename": "zookeeper_3.properties"
+    },
+
+    {
+      "entity_id": "4",
+      "port": "9091",
+      "brokerid": "1",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_4_logs",
+      "log_filename": "kafka_server_4.log",
+      "config_filename": "kafka_server_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "port": "9092",
+      "brokerid": "2",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_5_logs",
+      "log_filename": "kafka_server_5.log",
+      "config_filename": "kafka_server_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "port": "9093",
+      "brokerid": "3",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_6_logs",
+      "log_filename": "kafka_server_6.log",
+      "config_filename": "kafka_server_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "port": "9094",
+      "brokerid": "4",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_7_logs",
+      "log_filename": "kafka_server_7.log",
+      "config_filename": "kafka_server_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "port": "9095",
+      "brokerid": "5",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_8_logs",
+      "log_filename": "kafka_server_8.log",
+      "config_filename": "kafka_server_8.properties"
+    },
+    {
+      "entity_id": "9",
+      "port": "9096",
+      "brokerid": "6",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_9_logs",
+      "log_filename": "kafka_server_9.log",
+      "config_filename": "kafka_server_9.properties"
+    },
+
+    {
+      "entity_id": "10",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "500",
+      "request-num-acks": "-1",
+      "async":"false",
+      "log_filename": "producer_performance_10.log",
+      "config_filename": "producer_performance_10.properties"
+    },
+    {
+      "entity_id": "11",
+      "topic": "test_1",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_11.log",
+      "config_filename": "console_consumer_11.properties"
+    },
+
+    {
+      "entity_id": "12",
+      "log_filename": "mirror_maker_12.log",
+      "mirror_consumer_config_filename": "mirror_consumer_12.properties",
+      "mirror_producer_config_filename": "mirror_producer_12.properties"
+    }
+   ]
+}

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties?rev=1396687&r1=1396686&r2=1396687&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties Wed Oct 10 16:56:57 2012
@@ -2,3 +2,4 @@ topic=mytest
 message-size=100
 thread=5
 compression-codec=0
+request-num-acks=-1

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1396687&r1=1396686&r2=1396687&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Wed Oct 10 16:56:57 2012
@@ -31,7 +31,9 @@ import traceback
 
 from   system_test_env    import SystemTestEnv
 sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
+
 from   setup_utils        import SetupUtils
+from   replication_utils  import ReplicationUtils
 import system_test_utils
 from   testcase_env       import TestcaseEnv
 
@@ -39,12 +41,10 @@ from   testcase_env       import Testcas
 import kafka_system_test_utils
 import metrics
 
-class ReplicaBasicTest(SetupUtils):
+class ReplicaBasicTest(ReplicationUtils, SetupUtils):
 
     testModuleAbsPathName = os.path.realpath(__file__)
     testSuiteAbsPathName  = os.path.abspath(os.path.dirname(testModuleAbsPathName))
-    isLeaderLogPattern    = "Completed the leader state transition"
-    brokerShutDownCompletedPattern = "shut down completed"
 
     def __init__(self, systemTestEnv):
 
@@ -54,20 +54,24 @@ class ReplicaBasicTest(SetupUtils):
         #     "clusterEntityConfigDictList"
         self.systemTestEnv = systemTestEnv
 
+        super(ReplicaBasicTest, self).__init__(self)
+
         # dict to pass user-defined attributes to logger argument: "extra"
         d = {'name_of_class': self.__class__.__name__}
 
     def signal_handler(self, signal, frame):
         self.log_message("Interrupt detected - User pressed Ctrl+c")
 
-        for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
-            kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, self.testcaseEnv, entityId, parentPid)
-
+        # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific
+        self.log_message("stopping all entities - please wait ...")
+        kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
         sys.exit(1) 
 
     def runTest(self):
 
+        # ======================================================================
         # get all testcase directories under this testsuite
+        # ======================================================================
         testCasePathNameList = system_test_utils.get_dir_paths_with_prefix(
             self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
         testCasePathNameList.sort()
@@ -76,75 +80,48 @@ class ReplicaBasicTest(SetupUtils):
         # launch each testcase one by one: testcase_1, testcase_2, ...
         # =============================================================
         for testCasePathName in testCasePathNameList:
-   
+
+            skipThisTestCase = False
+
             try: 
-                # create a new instance of TestcaseEnv to keep track of this testcase's environment variables
+                # ======================================================================
+                # A new instance of TestcaseEnv to keep track of this testcase's env vars
+                # and initialize some env vars as testCasePathName is available now
+                # ======================================================================
                 self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self)
                 self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
-    
+                self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName)
+                self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"]
+
                 # ======================================================================
-                # initialize self.testcaseEnv with user-defined environment variables
+                # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json
                 # ======================================================================
-                self.testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] = ReplicaBasicTest.brokerShutDownCompletedPattern
-                self.testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"] = \
-                    "\[(.*?)\] .* \[Kafka Server (.*?)\], " + ReplicaBasicTest.brokerShutDownCompletedPattern
-
-                self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = ReplicaBasicTest.isLeaderLogPattern
-                self.testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"]  = \
-                    "\[(.*?)\] .* Broker (.*?): " + \
-                    self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + \
-                    " for topic (.*?) partition (.*?) \(.*"
+                testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"]
 
+                if self.systemTestEnv.printTestDescriptionsOnly:
+                    self.testcaseEnv.printTestCaseDescription(testcaseDirName)
+                    continue
+                elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName):
+                    self.log_message("Skipping : " + testcaseDirName)
+                    skipThisTestCase = True
+                    continue
+                else:
+                    self.testcaseEnv.printTestCaseDescription(testcaseDirName)
+                    system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName)
+
+
+                # ============================================================================== #
+                # ============================================================================== #
+                #                   Product Specific Testing Code Starts Here:                   #
+                # ============================================================================== #
+                # ============================================================================== #
+    
+                # initialize self.testcaseEnv with user-defined environment variables (product specific)
                 self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
                 self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]    = False
                 self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False
+                self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"] = []
 
-                # find testcase properties json file
-                testcasePropJsonPathName = system_test_utils.get_testcase_prop_json_pathname(testCasePathName)
-                self.logger.debug("testcasePropJsonPathName : " + testcasePropJsonPathName, extra=self.d)
-    
-                # get the dictionary that contains the testcase arguments and description
-                testcaseNonEntityDataDict = system_test_utils.get_json_dict_data(testcasePropJsonPathName)
-    
-                testcaseDirName = os.path.basename(testCasePathName)
-                self.testcaseEnv.testcaseResultsDict["test_case_name"] = testcaseDirName
-    
-                # update testcaseEnv
-                self.testcaseEnv.testCaseBaseDir = testCasePathName
-                self.testcaseEnv.testCaseLogsDir = self.testcaseEnv.testCaseBaseDir + "/logs"
-                self.testcaseEnv.testCaseDashboardsDir = self.testcaseEnv.testCaseBaseDir + "/dashboards"
-
-                # get testcase description
-                testcaseDescription = ""
-                for k,v in testcaseNonEntityDataDict.items():
-                    if ( k == "description" ): testcaseDescription = v
-    
-                # TestcaseEnv.testcaseArgumentsDict initialized, this dictionary keeps track of the
-                # "testcase_args" in the testcase_properties.json such as replica_factor, num_partition, ...
-                self.testcaseEnv.testcaseArgumentsDict = testcaseNonEntityDataDict["testcase_args"]
-    
-                # =================================================================
-                # TestcaseEnv environment settings initialization are completed here
-                # =================================================================
-                # self.testcaseEnv.systemTestBaseDir
-                # self.testcaseEnv.testSuiteBaseDir
-                # self.testcaseEnv.testCaseBaseDir
-                # self.testcaseEnv.testCaseLogsDir
-                # self.testcaseEnv.testcaseArgumentsDict
-    
-                print
-                # display testcase name and arguments
-                self.log_message("Test Case : " + testcaseDirName)
-                for k,v in self.testcaseEnv.testcaseArgumentsDict.items():
-                    self.anonLogger.info("    " + k + " : " + v)
-                self.log_message("Description : " + testcaseDescription)
-   
-                # ================================================================ #
-                # ================================================================ #
-                #            Product Specific Testing Code Starts Here:            #
-                # ================================================================ #
-                # ================================================================ #
-    
                 # initialize signal handler
                 signal.signal(signal.SIGINT, self.signal_handler)
     
@@ -154,11 +131,13 @@ class ReplicaBasicTest(SetupUtils):
     
                 # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file:
                 #   system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
-                self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(testcasePropJsonPathName)
+                self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(
+                    self.testcaseEnv.testcasePropJsonPathName)
     
                 # TestcaseEnv - initialize producer & consumer config / log file pathnames
                 kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
-    
+
+                
                 # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
                 kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
 
@@ -171,7 +150,8 @@ class ReplicaBasicTest(SetupUtils):
                 # 2. update all properties files in system_test/<suite_name>_testsuite/testcase_<n>/config
                 #    by overriding the settings specified in:
                 #    system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
-                kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName, self.testcaseEnv, self.systemTestEnv)
+                kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName,
+                    self.testcaseEnv, self.systemTestEnv)
    
                 # =============================================
                 # preparing all entities to start the test
@@ -183,30 +163,36 @@ class ReplicaBasicTest(SetupUtils):
         
                 self.log_message("starting brokers")
                 kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv)
-                self.anonLogger.info("sleeping for 2s")
-                time.sleep(2)
-        
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+
                 self.log_message("creating topics")
                 kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
                 self.anonLogger.info("sleeping for 5s")
                 time.sleep(5)
-        
+                
                 # =============================================
                 # starting producer 
                 # =============================================
                 self.log_message("starting producer in the background")
-                kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv)
-                self.anonLogger.info("sleeping for 5s")
-                time.sleep(5)
+                kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False)
+                msgProducingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_producing_free_time_sec"]
+                self.anonLogger.info("sleeping for " + msgProducingFreeTimeSec + " sec to produce some messages")
+                time.sleep(int(msgProducingFreeTimeSec))
 
+                # =============================================
+                # A while-loop to bounce leader as specified
+                # by "num_iterations" in testcase_n_properties.json
+                # =============================================
                 i = 1
                 numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"])
                 while i <= numIterations:
 
                     self.log_message("Iteration " + str(i) + " of " + str(numIterations))
 
-                    # looking up leader
-                    leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv)
+                    self.log_message("looking up leader")
+                    leaderDict = kafka_system_test_utils.get_leader_elected_log_line(
+                        self.systemTestEnv, self.testcaseEnv, self.leaderAttributesDict)
         
                     # ==========================
                     # leaderDict looks like this:
@@ -226,15 +212,18 @@ class ReplicaBasicTest(SetupUtils):
                                  self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
         
                     # =============================================
-                    # get leader re-election latency by stopping leader
+                    # trigger leader re-election by stopping leader
+                    # to get re-election latency
                     # =============================================
                     bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"]
                     self.log_message("bounce_leader flag : " + bounceLeaderFlag)
                     if (bounceLeaderFlag.lower() == "true"):
-                        reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict)
+                        reelectionLatency = kafka_system_test_utils.get_reelection_latency(
+                            self.systemTestEnv, self.testcaseEnv, leaderDict, self.leaderAttributesDict)
                         latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid " + leaderDict["brokerid"]
                         self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
-       
+                        self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency * 1000))
+
                     # =============================================
                     # starting previously terminated broker 
                     # =============================================
@@ -243,24 +232,45 @@ class ReplicaBasicTest(SetupUtils):
                         stoppedLeaderEntityId  = leaderDict["entity_id"]
                         kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId)
 
-                    self.anonLogger.info("sleeping for 5s")
-                    time.sleep(5)
+                    self.anonLogger.info("sleeping for 15s")
+                    time.sleep(15)
                     i += 1
                 # while loop
 
+                self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = None
+                try:
+                    self.testcaseEnv.validationStatusDict["Leader Election Latency MIN"] = \
+                        min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
+                except:
+                    pass
+
+                self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = None
+                try:
+                    self.testcaseEnv.validationStatusDict["Leader Election Latency MAX"] = \
+                        max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
+                except:
+                    pass
+
+                # =============================================
                 # tell producer to stop
+                # =============================================
                 self.testcaseEnv.lock.acquire()
                 self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
                 time.sleep(1)
                 self.testcaseEnv.lock.release()
                 time.sleep(1)
 
+                # =============================================
+                # wait for producer thread's update of
+                # "backgroundProducerStopped" to be "True"
+                # =============================================
                 while 1:
                     self.testcaseEnv.lock.acquire()
                     self.logger.info("status of backgroundProducerStopped : [" + \
                         str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d)
                     if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
                         time.sleep(1)
+                        self.logger.info("all producer threads completed", extra=self.d)
                         break
                     time.sleep(1)
                     self.testcaseEnv.lock.release()
@@ -274,27 +284,34 @@ class ReplicaBasicTest(SetupUtils):
                 self.anonLogger.info("sleeping for 10s")
                 time.sleep(10)
                     
-                # this testcase is completed - so stopping all entities
+                # =============================================
+                # this testcase is completed - stop all entities
+                # =============================================
                 self.log_message("stopping all entities")
-                for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
+                for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+                for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items():
                     kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
 
                 # make sure all entities are stopped
                 kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)
 
-                # validate the data matched
-                # =============================================
-                self.log_message("validating data matched")
-                result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
-                
                 # =============================================
                 # collect logs from remote hosts
                 # =============================================
                 kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
     
-                # ==========================
+                # =============================================
+                # validate the data matched and checksum
+                # =============================================
+                self.log_message("validating data matched")
+                kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
+                kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv)
+
+                # =============================================
                 # draw graphs
-                # ==========================
+                # =============================================
                 metrics.draw_all_graphs(self.systemTestEnv.METRICS_PATHNAME, 
                                         self.testcaseEnv, 
                                         self.systemTestEnv.clusterEntityConfigDictList)
@@ -303,30 +320,12 @@ class ReplicaBasicTest(SetupUtils):
                 metrics.build_all_dashboards(self.systemTestEnv.METRICS_PATHNAME,
                                              self.testcaseEnv.testCaseDashboardsDir,
                                              self.systemTestEnv.clusterEntityConfigDictList)
-                
             except Exception as e:
                 self.log_message("Exception while running test {0}".format(e))
                 traceback.print_exc()
-                traceback.print_exc()
 
             finally:
-                self.log_message("stopping all entities")
-
-                for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
-                    kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, parentPid)
-
-                for entityId, jmxParentPidList in self.testcaseEnv.entityJmxParentPidDict.items():
-                    for jmxParentPid in jmxParentPidList:
-                        kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, jmxParentPid)
-
-                for hostname, consumerPPid in self.testcaseEnv.consumerHostParentPidDict.items():
-                    consumerEntityId = system_test_utils.get_data_by_lookup_keyval( \
-                        self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id")
-                    kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, consumerEntityId, consumerPPid)
-
-                for hostname, producerPPid in self.testcaseEnv.producerHostParentPidDict.items():
-                    producerEntityId = system_test_utils.get_data_by_lookup_keyval( \
-                        self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id")
-                    kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, producerEntityId, producerPPid)
-
+                if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly:
+                    self.log_message("stopping all entities - please wait ...")
+                    kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
 

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json Wed Oct 10 16:56:57 2012
@@ -0,0 +1,76 @@
+{
+  "description": {"01":"Replication Basic : Base Test",
+                  "02":"Produce and consume messages to a single topic - single partition.",
+                  "03":"This test sends messages to 3 replicas",
+                  "04":"At the end it verifies the log size and contents",
+                  "05":"Use a consumer to verify no message loss.",
+                  "06":"Producer dimensions : mode:sync, acks:-1, comp:0",
+                  "07":"Log segment size    : 10240"
+  },
+  "testcase_args": {
+    "bounce_leader": "false",
+    "replica_factor": "3",
+    "num_partition": "1",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "brokerid": "1",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "brokerid": "2",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "brokerid": "3",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "-1",
+      "async":"false",
+      "log_filename": "producer_performance.log",
+      "config_filename": "producer_performance.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_1",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "zookeeper": "localhost:2188",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer.properties"
+    }
+  ]
+}

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json Wed Oct 10 16:56:57 2012
@@ -0,0 +1,76 @@
+{
+  "description": {"01":"Replication Basic : 1. comp => 1",
+                  "02":"Produce and consume messages to a single topic - single partition.",
+                  "03":"This test sends messages to 3 replicas",
+                  "04":"At the end it verifies the log size and contents",
+                  "05":"Use a consumer to verify no message loss.",
+                  "06":"Producer dimensions : mode:sync, acks:-1, comp:1",
+                  "07":"Log segment size    : 10240"
+  },
+  "testcase_args": {
+    "bounce_leader": "false",
+    "replica_factor": "3",
+    "num_partition": "1",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "brokerid": "1",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "brokerid": "2",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "brokerid": "3",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "-1",
+      "async":"false",
+      "log_filename": "producer_performance.log",
+      "config_filename": "producer_performance.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_1",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "zookeeper": "localhost:2188",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer.properties"
+    }
+  ]
+}



Mime
View raw message