incubator-kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1152970 [26/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Date Mon, 01 Aug 2011 23:42:17 GMT
Added: incubator/kafka/trunk/perf/src/main/java/kafka/perf/PerfTimer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/java/kafka/perf/PerfTimer.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/java/kafka/perf/PerfTimer.java (added)
+++ incubator/kafka/trunk/perf/src/main/java/kafka/perf/PerfTimer.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,149 @@
+package kafka.perf;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+
+import kafka.message.NoCompressionCodec;
+import kafka.perf.jmx.BrokerJmxClient;
+
+public class PerfTimer extends Thread
+{
+    private final long timeToRun;
+    private final BrokerJmxClient brokerStats;
+    private final KafkaPerfSimulator perfSim;
+    private final int numConsumers, numProducer,numParts, numTopic;
+    private final String reportFile;
+    private final int compression;
+
+    public PerfTimer(BrokerJmxClient brokerStats,
+                     KafkaPerfSimulator perfSim, int numConsumers,
+                     int numProducer, int numParts, int numTopic,
+                     long timeToRun,
+                     String fileName, int compression)
+    {
+        super("PerfTimer");
+        this.timeToRun = timeToRun;
+        this.brokerStats = brokerStats;
+        this.perfSim = perfSim;
+        this.numConsumers = numConsumers;
+        this.numProducer = numProducer;
+        this.numParts = numParts;
+        this.numTopic = numTopic;
+        this.compression = compression;
+        reportFile = fileName;
+    }
+
+
+    protected File openReportFile(String name) throws Exception
+    {
+        File file = new File(name);
+        if (!file.exists()) {
+            if (file.getParentFile() != null) {
+                file.getParentFile().mkdirs();
+            }
+        }
+        return file;
+    }
+
+
+    public void printMBDataStats() throws Exception
+    {
+        File mbDataFile = openReportFile(reportFile + "/MBdata.csv");
+        boolean witeHeader = !mbDataFile.exists();
+        FileWriter fstream = new FileWriter(mbDataFile, true);
+        BufferedWriter writer = new BufferedWriter(fstream);
+        if(witeHeader)
+            writer.write(perfSim.getXaxisLabel() + ",consumer-MB/sec,total-consumer-MB/sec,producer-MB/sec, total-producer-MB/sec\n");
+        writer.write(perfSim.getXAxisVal() + "," + perfSim.getAvgMBytesRecPs() + "," + (numConsumers *perfSim.getAvgMBytesRecPs()) +
+                "," + perfSim.getAvgMBytesSentPs()  + "," + (perfSim.getAvgMBytesSentPs() * numProducer ));
+
+        writer.newLine();
+        writer.close();
+        fstream.close();
+    }
+
+
+    public void printMessageDataStats() throws Exception
+    {
+        File file = openReportFile(reportFile + "/NumMessage.csv");
+        boolean witeHeader = !file.exists();
+        FileWriter fstream = new FileWriter(file, true);
+        BufferedWriter writer = new BufferedWriter(fstream);
+        if(witeHeader)
+            writer.write(perfSim.getXaxisLabel() + ",consumer-messages/sec,total-consumer-messages/sec,producer-messages/sec, total-producer-messages/sec\n");
+        writer.write(perfSim.getXAxisVal() + "," + perfSim.getAvgMessagesRecPs() + "," + (numConsumers *perfSim.getAvgMessagesRecPs()) +
+                "," + perfSim.getAvgMessagesSentPs()  + "," + (perfSim.getAvgMessagesSentPs() * numProducer) );
+
+        writer.newLine();
+        writer.close();
+        fstream.close();
+    }
+
+
+    public void printReport() throws Exception
+    {
+        String header = "#consumers, #of producers, #of partitions, #of topic, " +
+                "consumer mess/sec,consumer MB/sec, producer mess/sec,producer MB/sec, broker MB write/sec, broker MB read/sec";
+        String data = numConsumers+ "," + numProducer + "," + numParts+ "," + numTopic + "," +
+                perfSim.getAvgMessagesRecPs() + "," +
+                perfSim.getAvgMBytesRecPs() + "," +
+                perfSim.getAvgMessagesSentPs() + "," +
+                perfSim.getAvgMBytesSentPs() + "," +
+                brokerStats.getBrokerStats();
+
+        System.out.println(header);
+        System.out.println(data);
+        printMessageDataStats();
+        printMBDataStats();
+        if(compression != NoCompressionCodec.codec())
+            printCompressionRatio();
+    }
+
+    public void printCompressionRatio() throws Exception
+    {
+        if(perfSim.getKafkaServersURL().equals("localhost") && (perfSim.getKafkaServerLogDir() != null)) {
+            File logDir = new File(perfSim.getKafkaServerLogDir());
+            long totalLogLength = 0L;
+            if(logDir.isDirectory()) {
+                File[] files = logDir.listFiles();
+                for(int i = 0; i < files.length; i++)
+                    totalLogLength += files[i].length();
+            }else
+                totalLogLength += logDir.length();
+            System.out.println("Log length = " + totalLogLength);
+            File file = new File(reportFile + "/CompressionRatio.csv");
+            boolean writeHeader = !file.exists();
+            FileWriter fstream = new FileWriter(file, true);
+            BufferedWriter writer = new BufferedWriter(fstream);
+            if(writeHeader)
+                writer.write(perfSim.getXaxisLabel() + "Compression Ratio\n");
+            writer.write(perfSim.getXAxisVal() + "," + (perfSim.getTotalBytesSent()/(double)totalLogLength));
+
+            writer.newLine();
+            writer.close();
+            fstream.close();
+        }
+    }
+
+    public void run() {
+        try
+        {
+            Thread.sleep(timeToRun);
+        }
+        catch (InterruptedException e)
+        {
+            e.printStackTrace();
+        }
+
+        try
+        {
+            printReport();
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+        }
+        System.exit(0);
+    }
+}

Added: incubator/kafka/trunk/perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java (added)
+++ incubator/kafka/trunk/perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.perf.consumer;
+
+import java.lang.Thread;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+
+import kafka.api.FetchRequest;
+import kafka.javaapi.MultiFetchResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+
+public class SimplePerfConsumer extends Thread
+{
+  private SimpleConsumer simpleConsumer;
+  private String topic;
+  private String consumerName;
+  private int fetchSize;
+  private AtomicLong bytesRec;
+  private AtomicLong messagesRec;
+  private AtomicLong lastReportMessageRec;
+  private AtomicLong lastReportBytesRec;
+  private long offset = 0;
+  private final int numParts;
+
+  public SimplePerfConsumer(String topic, String kafkaServerURL, int kafkaServerPort,
+                            int kafkaProducerBufferSize, int connectionTimeOut, int reconnectInterval,
+                            int fetchSize, String name, int numParts)
+  {
+    super(name);
+    simpleConsumer = new SimpleConsumer(kafkaServerURL,
+                                        kafkaServerPort,
+                                        connectionTimeOut,
+                                        kafkaProducerBufferSize);
+    this.topic = topic; 
+    this.fetchSize = fetchSize;
+    consumerName = name;
+    bytesRec =  new AtomicLong(0L);
+    messagesRec =  new AtomicLong(0L);
+    lastReportMessageRec = new AtomicLong(System.currentTimeMillis());
+    lastReportBytesRec = new AtomicLong(System.currentTimeMillis());
+    this.numParts = numParts;
+  }
+
+  public void run() {
+    while(true)
+    {
+      List<FetchRequest> list = new ArrayList<FetchRequest>();
+      for(int i=0 ; i < numParts; i++)
+      {
+        FetchRequest req = new FetchRequest(topic, i, offset, fetchSize);
+        list.add(req);
+      }
+
+
+      MultiFetchResponse response = simpleConsumer.multifetch(list);
+      for (ByteBufferMessageSet messages: response)
+      {
+        offset+= messages.validBytes();
+        bytesRec.getAndAdd(messages.sizeInBytes());
+
+        Iterator<MessageAndOffset> it =  messages.iterator();
+        while(it.hasNext())
+        {
+          it.next();
+          messagesRec.getAndIncrement();
+        }
+      }
+    }
+  }
+
+  public double getMessagesRecPs()
+  {
+    double val = (double)messagesRec.get() / (System.currentTimeMillis() - lastReportMessageRec.get());
+    return val * 1000;
+  }
+
+  public String getConsumerName()
+  {
+    return consumerName;
+  }
+
+  public double getMBytesRecPs()
+  {
+    double val = ((double)bytesRec.get() / (System.currentTimeMillis() - lastReportBytesRec.get())) / (1024*1024);
+    return val * 1000;
+  }
+
+}

Added: incubator/kafka/trunk/perf/src/main/java/kafka/perf/jmx/BrokerJmxClient.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/java/kafka/perf/jmx/BrokerJmxClient.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/java/kafka/perf/jmx/BrokerJmxClient.java (added)
+++ incubator/kafka/trunk/perf/src/main/java/kafka/perf/jmx/BrokerJmxClient.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,49 @@
+package kafka.perf.jmx;
+
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import kafka.network.SocketServerStatsMBean;
+
+public class BrokerJmxClient
+{
+  private final String host;
+  private final int port;
+  private final long time;
+  public BrokerJmxClient(String host, int port,
+                         long time)
+  {
+    this.host = host;
+    this.port = port;
+    this.time = time;
+  }
+  
+  public MBeanServerConnection getMbeanConnection() throws Exception
+  {
+    JMXServiceURL url =
+      new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"+ host+ ":" + port + "/jmxrmi");
+    JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+    MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+    return mbsc;
+  }
+  
+  public SocketServerStatsMBean createSocketMbean() throws Exception
+  {
+ 
+    ObjectName mbeanName = new ObjectName("kafka:type=kafka.SocketServerStats");
+    SocketServerStatsMBean stats = JMX.newMBeanProxy(getMbeanConnection(), mbeanName, SocketServerStatsMBean.class, true);
+    return stats;
+  }
+  
+  public String getBrokerStats() throws Exception
+  {
+    StringBuffer buf = new StringBuffer();
+    SocketServerStatsMBean stats = createSocketMbean();
+    buf.append(stats.getBytesWrittenPerSecond() / (1024 *1024)  + "," +  stats.getBytesReadPerSecond()  / (1024 *1024) );
+    return buf.toString();
+  }
+}

Added: incubator/kafka/trunk/perf/src/main/java/kafka/perf/producer/Producer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/java/kafka/perf/producer/Producer.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/java/kafka/perf/producer/Producer.java (added)
+++ incubator/kafka/trunk/perf/src/main/java/kafka/perf/producer/Producer.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.perf.producer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.SyncProducerConfig;
+
+public class Producer extends Thread
+{
+  private final SyncProducer producer;
+  private final String topic;
+  private final int messageSize;
+  private AtomicLong bytesSent =  new AtomicLong(0L);
+  private AtomicLong messagesSent =  new AtomicLong(0L);
+  private AtomicLong lastReportMessageSent = new AtomicLong(System.currentTimeMillis());
+  private AtomicLong lastReportBytesSent = new AtomicLong(System.currentTimeMillis());
+  private String producerName;
+  private int batchSize;
+  private int numParts;
+  private final int compression;
+
+  public Producer(String topic, String kafkaServerURL, int kafkaServerPort,
+                  int kafkaProducerBufferSize, int connectionTimeOut, int reconnectInterval,
+                  int messageSize, String name, int batchSize, int numParts, int compression)
+  {
+    super(name);
+    Properties props = new Properties();
+    props.put("host", kafkaServerURL);
+    props.put("port", String.valueOf(kafkaServerPort));
+    props.put("buffer.size", String.valueOf(kafkaProducerBufferSize));
+    props.put("connect.timeout.ms", String.valueOf(connectionTimeOut));
+    props.put("reconnect.interval", String.valueOf(reconnectInterval));
+    producer = new SyncProducer(new SyncProducerConfig(props));
+    this.topic = topic; 
+
+    this.messageSize = messageSize;
+    producerName = name;
+    this.batchSize = batchSize;
+    this.numParts = numParts;
+    this.compression = compression;
+  }
+
+  public void run() {
+    Random random = new Random();
+    while(true)
+    {
+      List<Message> messageList = new ArrayList<Message>();
+      for(int i = 0; i < batchSize; i++)
+      {
+        Message message = new Message(new byte[messageSize]);
+        messageList.add(message);
+      }
+      ByteBufferMessageSet set = new ByteBufferMessageSet(kafka.message.CompressionCodec$.MODULE$.getCompressionCodec(compression), messageList);
+      producer.send(topic, random.nextInt(numParts), set);
+      bytesSent.getAndAdd(batchSize * messageSize);
+      messagesSent.getAndAdd(messageList.size());
+    }
+  }
+
+  public double getMessagesSentPs()
+  {
+    double val = (double)messagesSent.get() / (System.currentTimeMillis() - lastReportMessageSent.get());
+    return val * 1000;
+  }
+
+  public String getProducerName()
+  {
+    return producerName;
+  }
+
+  public double getMBytesSentPs()
+  {
+    double val = ((double)bytesSent.get() / (System.currentTimeMillis() - lastReportBytesSent.get())) / (1024*1024);
+    return val * 1000;
+  }
+
+  public long getTotalBytesSent() {
+    return bytesSent.get();
+  }
+}

Added: incubator/kafka/trunk/perf/util-bin/remote-kafka-env.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/remote-kafka-env.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/remote-kafka-env.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/remote-kafka-env.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,31 @@
+
+REMOTE_KAFKA_HOME="~/kafka-perf"
+REMOTE_KAFKA_LOG_DIR="$REMOTE_KAFKA_HOME/tmp/kafka-logs"
+SIMULATOR_SCRIPT="$REMOTE_KAFKA_HOME/perf/run-simulator.sh"
+
+REMOTE_KAFKA_HOST=`echo $REMOTE_KAFKA_LOGIN | cut -d @ -f 2`
+REMOTE_SIM_HOST=`echo $REMOTE_SIM_LOGIN | cut -d @ -f 2`
+
+# If we are running the broker on the same box, use the local interface.
+KAFKA_SERVER=$REMOTE_KAFKA_HOST
+if [[ "$REMOTE_KAFKA_HOST" == "$REMOTE_SIM_HOST" ]];
+then
+    KAFKA_SERVER="localhost"
+fi
+
+
+# todo: some echos
+# todo: talkative sleep
+
+function kafka_startup() {
+    ssh $REMOTE_KAFKA_LOGIN "cd $REMOTE_KAFKA_HOME; ./bin/kafka-server-start.sh config/server.properties 2>&1 > kafka.out" &
+    sleep 10
+}
+
+
+function kafka_cleanup() {
+    ssh $REMOTE_KAFKA_LOGIN "cd $REMOTE_KAFKA_HOME; ./bin/kafka-server-stop.sh" &
+    sleep 10
+    ssh $REMOTE_KAFKA_LOGIN "rm -rf $REMOTE_KAFKA_LOG_DIR" &
+    sleep 10
+}

Added: incubator/kafka/trunk/perf/util-bin/run-fetchsize-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-fetchsize-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-fetchsize-test.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-fetchsize-test.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+    kafka_startup
+    ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=10  -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=20 -numProducer=40 -xaxis=fetchSize -msgSize=1000 -fetchSize=$((1024*$i))"
+    kafka_cleanup
+done

Propchange: incubator/kafka/trunk/perf/util-bin/run-fetchsize-test.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/perf/util-bin/run-msgsize-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-msgsize-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-msgsize-test.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-msgsize-test.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 200 `seq -s " " 1000 1000 10000` ;
+do
+    kafka_startup
+    ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=10  -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=20 -numProducer=40 -xaxis=msgSize -msgSize=$i"
+    kafka_cleanup
+done

Propchange: incubator/kafka/trunk/perf/util-bin/run-msgsize-test.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/perf/util-bin/run-numconsumer-sustained.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-numconsumer-sustained.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-numconsumer-sustained.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-numconsumer-sustained.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+    kafka_startup
+    ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=10  -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=$i -numProducer=10 -xaxis=numConsumer"
+    kafka_cleanup
+done

Propchange: incubator/kafka/trunk/perf/util-bin/run-numconsumer-sustained.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/perf/util-bin/run-numconsumer-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-numconsumer-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-numconsumer-test.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-numconsumer-test.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+kafka_startup
+# You need to twidle this time value depending on test time below
+ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=1  -reportFile=$REPORT_FILE -time=7 -numConsumer=0 -numProducer=10 -xaxis=numConsumer"
+sleep 20
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+    ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=1  -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=$i -numProducer=0 -xaxis=numConsumer"
+    sleep 10
+done
+
+kafka_cleanup

Propchange: incubator/kafka/trunk/perf/util-bin/run-numconsumer-test.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/perf/util-bin/run-numproducer-single-topic.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-numproducer-single-topic.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-numproducer-single-topic.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-numproducer-single-topic.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+    kafka_startup
+    ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=1  -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=0 -numProducer=$i -xaxis=numProducer"
+    kafka_cleanup
+done

Propchange: incubator/kafka/trunk/perf/util-bin/run-numproducer-single-topic.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/perf/util-bin/run-numproducer-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-numproducer-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-numproducer-test.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-numproducer-test.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+    kafka_startup
+    ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=10  -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=20 -numProducer=$i -xaxis=numProducer"
+    kafka_cleanup
+done

Propchange: incubator/kafka/trunk/perf/util-bin/run-numproducer-test.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/perf/util-bin/run-numtopic-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/util-bin/run-numtopic-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/util-bin/run-numtopic-test.sh (added)
+++ incubator/kafka/trunk/perf/util-bin/run-numtopic-test.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+REMOTE_KAFKA_LOGIN=$1 # user@host format
+REMOTE_SIM_LOGIN=$2
+TEST_TIME=$3
+REPORT_FILE=$4
+
+. `dirname $0`/remote-kafka-env.sh
+
+for i in 1 `seq -s " " 10 10 50` ;
+do
+    kafka_startup
+    ssh $REMOTE_SIM_LOGIN "$SIMULATOR_SCRIPT -kafkaServer=$KAFKA_SERVER -numTopic=$i  -reportFile=$REPORT_FILE -time=$TEST_TIME -numConsumer=20 -numProducer=40 -xaxis=numTopic"
+    kafka_cleanup
+done

Propchange: incubator/kafka/trunk/perf/util-bin/run-numtopic-test.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/project/build.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/build.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/project/build.properties (added)
+++ incubator/kafka/trunk/project/build.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,10 @@
+#Project properties
+#Mon Feb 28 11:55:49 PST 2011
+project.name=Kafka
+sbt.version=0.7.5
+project.version=0.7
+build.scala.versions=2.8.0
+contrib.root.dir=contrib
+lib.dir=lib
+target.dir=target/scala_2.8.0
+dist.dir=dist

Added: incubator/kafka/trunk/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/build/KafkaProject.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/project/build/KafkaProject.scala (added)
+++ incubator/kafka/trunk/project/build/KafkaProject.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,190 @@
+import sbt._
+
+class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
+  lazy val core = project("core", "core-kafka", new CoreKafkaProject(_))
+  lazy val examples = project("examples", "java-examples", new KafkaExamplesProject(_), core)
+  lazy val perf = project("perf", "perf", new KafkaPerfProject(_), core)
+  lazy val contrib = project("contrib", "contrib", new ContribProject(_))
+
+  lazy val releaseZipTask = core.packageDistTask
+
+  val releaseZipDescription = "Compiles every sub project, runs unit tests, creates a deployable release zip file with dependencies, config, and scripts."
+  lazy val releaseZip = releaseZipTask dependsOn(core.corePackageAction, core.test, examples.examplesPackageAction, perf.perfPackageAction,
+    contrib.producerPackageAction, contrib.consumerPackageAction) describedAs releaseZipDescription
+
+  class CoreKafkaProject(info: ProjectInfo) extends DefaultProject(info)
+     with IdeaProject with CoreDependencies with TestDependencies {
+   val corePackageAction = packageAllAction
+
+  //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
+  // some dependencies on various sun and javax packages.
+   override def ivyXML =
+    <dependencies>
+      <exclude module="javax"/>
+      <exclude module="jmxri"/>
+      <exclude module="jmxtools"/>
+      <exclude module="mail"/>
+      <exclude module="jms"/>
+    </dependencies>
+
+    override def repositories = Set(ScalaToolsSnapshots, "JBoss Maven 2 Repository" at "http://repository.jboss.com/maven2",
+      "Oracle Maven 2 Repository" at "http://download.oracle.com/maven", "maven.org" at "http://repo2.maven.org/maven2/")
+
+    override def artifactID = "kafka"
+    override def filterScalaJars = false
+
+    // build the executable jar's classpath.
+    // (why is it necessary to explicitly remove the target/{classes,resources} paths? hm.)
+    def dependentJars = {
+      val jars =
+      publicClasspath +++ mainDependencies.scalaJars --- mainCompilePath --- mainResourcesOutputPath
+      if (jars.get.find { jar => jar.name.startsWith("scala-library-") }.isDefined) {
+        // workaround bug in sbt: if the compiler is explicitly included, don't include 2 versions
+        // of the library.
+        jars --- jars.filter { jar =>
+          jar.absolutePath.contains("/boot/") && jar.name == "scala-library.jar"
+        }
+      } else {
+        jars
+      }
+    }
+
+    def dependentJarNames = dependentJars.getFiles.map(_.getName).filter(_.endsWith(".jar"))
+    override def manifestClassPath = Some(dependentJarNames.map { "libs/" + _ }.mkString(" "))
+
+    def distName = (artifactID + "-" + projectVersion.value)
+    def distPath = "dist" / distName ##
+
+    def configPath = "config" ##
+    def configOutputPath = distPath / "config"
+
+    def binPath = "bin" ##
+    def binOutputPath = distPath / "bin"
+
+    def distZipName = {
+      "%s-%s.zip".format(artifactID, projectVersion.value)
+    }
+
+    lazy val packageDistTask = task {
+      distPath.asFile.mkdirs()
+      (distPath / "libs").asFile.mkdirs()
+      binOutputPath.asFile.mkdirs()
+      configOutputPath.asFile.mkdirs()
+
+      FileUtilities.copyFlat(List(jarPath), distPath, log).left.toOption orElse
+              FileUtilities.copyFlat(dependentJars.get, distPath / "libs", log).left.toOption orElse
+              FileUtilities.copy((configPath ***).get, configOutputPath, log).left.toOption orElse
+              FileUtilities.copy((binPath ***).get, binOutputPath, log).left.toOption orElse
+              FileUtilities.zip((("dist" / distName) ##).get, "dist" / distZipName, true, log)
+      None
+    }
+
+    val PackageDistDescription = "Creates a deployable zip file with dependencies, config, and scripts."
+    lazy val packageDist = packageDistTask dependsOn(`package`, `test`) describedAs PackageDistDescription
+
+    val cleanDist = cleanTask("dist" ##) describedAs("Erase any packaged distributions.")
+    override def cleanAction = super.cleanAction dependsOn(cleanDist)
+
+    override def javaCompileOptions = super.javaCompileOptions ++
+      List(JavaCompileOption("-source"), JavaCompileOption("1.5"))
+  }
+
+  class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info)
+     with IdeaProject
+     with CoreDependencies {
+    val examplesPackageAction = packageAllAction
+    val dependsOnCore = core
+  //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
+  // some dependencies on various sun and javax packages.
+   override def ivyXML =
+    <dependencies>
+      <exclude module="javax"/>
+      <exclude module="jmxri"/>
+      <exclude module="jmxtools"/>
+      <exclude module="mail"/>
+      <exclude module="jms"/>
+    </dependencies>
+
+    override def artifactID = "kafka-java-examples"
+    override def filterScalaJars = false
+  }
+
+  class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info)
+      with IdeaProject
+      with CoreDependencies {
+    val perfPackageAction = packageAllAction
+    val dependsOnCore = core
+  //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
+  // some dependencies on various sun and javax packages.
+   override def ivyXML =
+    <dependencies>
+      <exclude module="javax"/>
+      <exclude module="jmxri"/>
+      <exclude module="jmxtools"/>
+      <exclude module="mail"/>
+      <exclude module="jms"/>
+    </dependencies>
+
+    override def artifactID = "kafka-perf"
+    override def filterScalaJars = false
+  }
+
+  class ContribProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
+    lazy val hadoopProducer = project("hadoop-producer", "hadoop producer",
+                                      new HadoopProducerProject(_), core)
+    lazy val hadoopConsumer = project("hadoop-consumer", "hadoop consumer",
+                                      new HadoopConsumerProject(_), core)
+
+    val producerPackageAction = hadoopProducer.producerPackageAction
+    val consumerPackageAction = hadoopConsumer.consumerPackageAction
+
+    class HadoopProducerProject(info: ProjectInfo) extends DefaultProject(info)
+      with IdeaProject
+      with CoreDependencies {
+      val producerPackageAction = packageAllAction
+      override def ivyXML =
+       <dependencies>
+         <exclude module="netty"/>
+           <exclude module="javax"/>
+           <exclude module="jmxri"/>
+           <exclude module="jmxtools"/>
+           <exclude module="mail"/>
+           <exclude module="jms"/>
+       </dependencies>
+
+      val avro = "org.apache.avro" % "avro" % "1.4.1"
+      val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5"
+      val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5"
+    }
+
+    class HadoopConsumerProject(info: ProjectInfo) extends DefaultProject(info)
+      with IdeaProject
+      with CoreDependencies {
+      val consumerPackageAction = packageAllAction
+      override def ivyXML =
+       <dependencies>
+         <exclude module="netty"/>
+           <exclude module="javax"/>
+           <exclude module="jmxri"/>
+           <exclude module="jmxtools"/>
+           <exclude module="mail"/>
+           <exclude module="jms"/>
+       </dependencies>
+
+      val jodaTime = "joda-time" % "joda-time" % "1.6"
+      val httpclient = "commons-httpclient" % "commons-httpclient" % "3.1"
+    }
+  }
+
+  trait TestDependencies {
+    val easymock = "org.easymock" % "easymock" % "3.0" % "test"
+    val junit = "junit" % "junit" % "4.1" % "test"
+    val scalaTest = "org.scalatest" % "scalatest" % "1.2" % "test"
+  }
+
+  trait CoreDependencies {
+    val log4j = "log4j" % "log4j" % "1.2.15"
+    val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
+  }
+
+}

Added: incubator/kafka/trunk/project/plugins/Plugins.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/plugins/Plugins.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/project/plugins/Plugins.scala (added)
+++ incubator/kafka/trunk/project/plugins/Plugins.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,6 @@
+import sbt._
+
+class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
+  val repo = "GH-pages repo" at "http://mpeltonen.github.com/maven/"
+  val idea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.1-SNAPSHOT"
+}

Added: incubator/kafka/trunk/sbt
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/sbt?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/sbt (added)
+++ incubator/kafka/trunk/sbt Mon Aug  1 23:41:24 2011
@@ -0,0 +1 @@
+java -Xmx1024M -XX:MaxPermSize=512m -jar `dirname $0`/lib/sbt-launch.jar "$@"

Propchange: incubator/kafka/trunk/sbt
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/system_test/embedded_consumer/README
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/README?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/README (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/README Mon Aug  1 23:41:24 2011
@@ -0,0 +1,8 @@
+This test replicates messages from 3 kafka brokers to 2 other kafka brokers using the embedded consumer.
+At the end, the messages produced at the source brokers should match that at the target brokers.
+
+To run this test, do
+bin/run-test.sh
+
+The expected output is given in bin/expected.out. There is only 1 thing that's important.
+1. The output should have a line "test passed".

Added: incubator/kafka/trunk/system_test/embedded_consumer/bin/expected.out
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/bin/expected.out?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/bin/expected.out (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/bin/expected.out Mon Aug  1 23:41:24 2011
@@ -0,0 +1,18 @@
+start the servers ...
+start producing messages ...
+wait for consumer to finish consuming ...
+[2011-05-17 14:49:11,605] INFO Creating async producer for broker id = 2 at localhost:9091 (kafka.producer.ProducerPool)
+[2011-05-17 14:49:11,606] INFO Creating async producer for broker id = 1 at localhost:9092 (kafka.producer.ProducerPool)
+[2011-05-17 14:49:11,607] INFO Creating async producer for broker id = 3 at localhost:9090 (kafka.producer.ProducerPool)
+thread 0: 400000 messages sent 3514012.1233 nMsg/sec 3.3453 MBs/sec
+[2011-05-17 14:49:34,382] INFO Closing all async producers (kafka.producer.ProducerPool)
+[2011-05-17 14:49:34,383] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+[2011-05-17 14:49:34,384] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+[2011-05-17 14:49:34,385] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+Total Num Messages: 400000 bytes: 79859641 in 22.93 secs
+Messages/sec: 17444.3960
+MB/sec: 3.3214
+test passed
+stopping the servers
+bin/../../../bin/zookeeper-server-start.sh: line 9: 22584 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
+bin/../../../bin/zookeeper-server-start.sh: line 9: 22585 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@

Added: incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,79 @@
+#!/bin/bash
+
+num_messages=400000
+message_size=400
+
+base_dir=$(dirname $0)/..
+
+rm -rf /tmp/zookeeper_source
+rm -rf /tmp/zookeeper_target
+rm -rf /tmp/kafka-source1-logs
+mkdir /tmp/kafka-source1-logs
+mkdir /tmp/kafka-source1-logs/test01-0
+touch /tmp/kafka-source1-logs/test01-0/00000000000000000000.kafka
+rm -rf /tmp/kafka-source2-logs
+mkdir /tmp/kafka-source2-logs
+mkdir /tmp/kafka-source2-logs/test01-0
+touch /tmp/kafka-source2-logs/test01-0/00000000000000000000.kafka
+rm -rf /tmp/kafka-source3-logs
+mkdir /tmp/kafka-source3-logs
+mkdir /tmp/kafka-source3-logs/test01-0
+touch /tmp/kafka-source3-logs/test01-0/00000000000000000000.kafka
+rm -rf /tmp/kafka-target1-logs
+rm -rf /tmp/kafka-target2-logs
+
+echo "start the servers ..."
+$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log &
+$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
+$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log &
+$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log &
+$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log &
+$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target1.log &
+$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target2.log &
+
+sleep 4
+echo "start producing messages ..."
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval 400000 num_messages --async --delay-btw-batch-ms 10 &
+
+echo "wait for consumer to finish consuming ..."
+cur1_offset="-1"
+cur2_offset="-1"
+quit1=0
+quit2=0
+while [ $quit1 -eq 0 ] && [ $quit2 -eq 0 ]
+do
+  sleep 2
+  target1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+  if [ $target1_size -eq $cur1_offset ]
+  then
+    quit1=1
+  fi
+  cur1_offset=$target1_size
+  target2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+  if [ $target2_size -eq $cur2_offset ]
+  then
+    quit2=1
+  fi
+  cur2_offset=$target2_size
+done
+
+sleep 2
+source_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+source_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+source_part2_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+target_part0_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+target_part1_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+
+expected_size=`expr $source_part0_size + $source_part1_size + $source_part2_size`
+actual_size=`expr $target_part0_size + $target_part1_size`
+if [ $expected_size != $actual_size ]
+then
+   echo "source size: $expected_size target size: $actual_size test failed!!! look at it!!!"
+else
+   echo "test passed"
+fi
+
+echo "stopping the servers"
+ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null
+sleep 2
+ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null

Propchange: incubator/kafka/trunk/system_test/embedded_consumer/bin/run-test.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/consumer.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,14 @@
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=group1
+
+embeddedconsumer.topics=test01:1

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source1.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9092
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=10000000
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source2.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9091
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_source3.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=3
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9090
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-source3-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/server_target1.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_target1.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_target1.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_target1.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=1
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9093
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-target1-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/server_target2.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/server_target2.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/server_target2.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/server_target2.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=2
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9094
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-target2-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2182
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_source.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_source.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_source.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_source.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,4 @@
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_source
+# the port at which the clients will connect
+clientPort=2181

Added: incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_target.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_target.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_target.properties (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/config/zookeeper_target.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,4 @@
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_target
+# the port at which the clients will connect
+clientPort=2182

Added: incubator/kafka/trunk/system_test/embedded_consumer/expected.out
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/embedded_consumer/expected.out?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/embedded_consumer/expected.out (added)
+++ incubator/kafka/trunk/system_test/embedded_consumer/expected.out Mon Aug  1 23:41:24 2011
@@ -0,0 +1,11 @@
+start the servers ...
+start producing messages ...
+Total Num Messages: 10000000 bytes: 1994374785 in 106.076 secs
+Messages/sec: 94272.0314
+MB/sec: 17.9304
+[2011-05-02 11:50:29,022] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
+wait for consumer to finish consuming ...
+test passed
+bin/../../../bin/kafka-server-start.sh: line 11:   359 Terminated              $(dirname $0)/kafka-run-class.sh kafka.Kafka $@
+bin/../../../bin/zookeeper-server-start.sh: line 9:   357 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
+bin/../../../bin/zookeeper-server-start.sh: line 9:   358 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@

Added: incubator/kafka/trunk/system_test/producer_perf/README
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/README?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/README (added)
+++ incubator/kafka/trunk/system_test/producer_perf/README Mon Aug  1 23:41:24 2011
@@ -0,0 +1,9 @@
+This test produces a large number of messages to a broker. It measures the throughput and tests
+the amount of data received is expected.
+
+To run this test, do
+bin/run-test.sh
+
+The expected output is given in expected.out. There are 2 things to pay attention to:
+1. The output should have a line "test passed".
+2. The throughput from the producer should be around 300,000 Messages/sec on a typical machine.

Added: incubator/kafka/trunk/system_test/producer_perf/bin/expected.out
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/bin/expected.out?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/bin/expected.out (added)
+++ incubator/kafka/trunk/system_test/producer_perf/bin/expected.out Mon Aug  1 23:41:24 2011
@@ -0,0 +1,32 @@
+start the servers ...
+start producing 2000000 messages ...
+[2011-05-17 14:31:12,568] INFO Creating async producer for broker id = 0 at localhost:9092 (kafka.producer.ProducerPool)
+thread 0: 100000 messages sent 3272786.7779 nMsg/sec 3.1212 MBs/sec
+thread 0: 200000 messages sent 3685956.5057 nMsg/sec 3.5152 MBs/sec
+thread 0: 300000 messages sent 3717472.1190 nMsg/sec 3.5453 MBs/sec
+thread 0: 400000 messages sent 3730647.2673 nMsg/sec 3.5578 MBs/sec
+thread 0: 500000 messages sent 3730647.2673 nMsg/sec 3.5578 MBs/sec
+thread 0: 600000 messages sent 3722315.2801 nMsg/sec 3.5499 MBs/sec
+thread 0: 700000 messages sent 3718854.5928 nMsg/sec 3.5466 MBs/sec
+thread 0: 800000 messages sent 3714020.4271 nMsg/sec 3.5420 MBs/sec
+thread 0: 900000 messages sent 3713330.8578 nMsg/sec 3.5413 MBs/sec
+thread 0: 1000000 messages sent 3710575.1391 nMsg/sec 3.5387 MBs/sec
+thread 0: 1100000 messages sent 3711263.6853 nMsg/sec 3.5393 MBs/sec
+thread 0: 1200000 messages sent 3716090.6726 nMsg/sec 3.5439 MBs/sec
+thread 0: 1300000 messages sent 3709198.8131 nMsg/sec 3.5374 MBs/sec
+thread 0: 1400000 messages sent 3705762.4606 nMsg/sec 3.5341 MBs/sec
+thread 0: 1500000 messages sent 3701647.2330 nMsg/sec 3.5302 MBs/sec
+thread 0: 1600000 messages sent 3696174.4594 nMsg/sec 3.5249 MBs/sec
+thread 0: 1700000 messages sent 3703703.7037 nMsg/sec 3.5321 MBs/sec
+thread 0: 1800000 messages sent 3703017.9596 nMsg/sec 3.5315 MBs/sec
+thread 0: 1900000 messages sent 3700277.5208 nMsg/sec 3.5289 MBs/sec
+thread 0: 2000000 messages sent 3702332.4695 nMsg/sec 3.5308 MBs/sec
+[2011-05-17 14:33:01,102] INFO Closing all async producers (kafka.producer.ProducerPool)
+[2011-05-17 14:33:01,103] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
+Total Num Messages: 2000000 bytes: 400000000 in 108.678 secs
+Messages/sec: 18402.9886
+MB/sec: 3.5101
+wait for data to be persisted
+test passed
+bin/../../../bin/kafka-server-start.sh: line 11: 21110 Terminated              $(dirname $0)/kafka-run-class.sh kafka.Kafka $@
+bin/../../../bin/zookeeper-server-start.sh: line 9: 21109 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@

Added: incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh (added)
+++ incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,48 @@
+#!/bin/bash
+
+num_messages=2000000
+message_size=200
+
+base_dir=$(dirname $0)/..
+
+rm -rf /tmp/zookeeper
+rm -rf /tmp/kafka-logs
+
+echo "start the servers ..."
+$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper.properties 2>&1 > $base_dir/zookeeper.log &
+$base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&1 > $base_dir/kafka.log &
+
+sleep 4
+echo "start producing $num_messages messages ..."
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10 --compression-codec 1 
+
+echo "wait for data to be persisted" 
+cur_offset="-1"
+quit=0
+while [ $quit -eq 0 ]
+do
+  sleep 2
+  target_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+  if [ $target_size -eq $cur_offset ]
+  then
+    quit=1
+  fi
+  cur_offset=$target_size
+done
+
+sleep 2
+actual_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+num_batches=`expr $num_messages \/ $message_size`
+expected_size=`expr $num_batches \* 262`
+
+if [ $actual_size != $expected_size ]
+then
+   echo "actual size: $actual_size expected size: $expected_size test failed!!! look at it!!!"
+else
+   echo "test passed"
+fi
+
+ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
+sleep 2
+ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
+

Propchange: incubator/kafka/trunk/system_test/producer_perf/bin/run-compression-test.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh (added)
+++ incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,48 @@
+#!/bin/bash
+
+num_messages=2000000
+message_size=200
+
+base_dir=$(dirname $0)/..
+
+rm -rf /tmp/zookeeper
+rm -rf /tmp/kafka-logs
+
+echo "start the servers ..."
+$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper.properties 2>&1 > $base_dir/zookeeper.log &
+$base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&1 > $base_dir/kafka.log &
+
+sleep 4
+echo "start producing $num_messages messages ..."
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10 
+
+echo "wait for data to be persisted" 
+cur_offset="-1"
+quit=0
+while [ $quit -eq 0 ]
+do
+  sleep 2
+  target_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+  if [ $target_size -eq $cur_offset ]
+  then
+    quit=1
+  fi
+  cur_offset=$target_size
+done
+
+sleep 2
+actual_size=`$base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
+msg_full_size=`expr $message_size + 10`
+expected_size=`expr $num_messages \* $msg_full_size`
+
+if [ $actual_size != $expected_size ]
+then
+   echo "actual size: $actual_size expected size: $expected_size test failed!!! look at it!!!"
+else
+   echo "test passed"
+fi
+
+ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
+sleep 2
+ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
+

Propchange: incubator/kafka/trunk/system_test/producer_perf/bin/run-test.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/system_test/producer_perf/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/config/server.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/config/server.properties (added)
+++ incubator/kafka/trunk/system_test/producer_perf/config/server.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=0
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9092
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=600
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=1000
+
+# time based topic flasher time rate in ms
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4

Added: incubator/kafka/trunk/system_test/producer_perf/config/zookeeper.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/producer_perf/config/zookeeper.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/system_test/producer_perf/config/zookeeper.properties (added)
+++ incubator/kafka/trunk/system_test/producer_perf/config/zookeeper.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,4 @@
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper
+# the port at which the clients will connect
+clientPort=2181



Mime
View raw message