hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r740131 - in /hadoop/zookeeper/trunk: ./ src/contrib/fatjar/conf/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/systest/ src/java/systest/org/apache/zookeeper/test/system/ src/java/tes...
Date Mon, 02 Feb 2009 22:26:04 GMT
Author: mahadev
Date: Mon Feb  2 22:26:03 2009
New Revision: 740131

URL: http://svn.apache.org/viewvc?rev=740131&view=rev
Log:
ZOOKEEPER-286. Make GenerateLoad use InstanceContainers. (breed via mahadev)

Added:
    hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java
      - copied, changed from r740129, hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GenerateLoad.java
Removed:
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GenerateLoad.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerStats.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/systest/README.txt
    hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java
    hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java
    hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java
    hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java
    hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java
    hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Mon Feb  2 22:26:03 2009
@@ -145,6 +145,8 @@
    cleanup datadir (snaps/logs) (mahadev via phunt)
 
    ZOOKEEPER-69. ZooKeeper logo
+   
+   ZOOKEEPER-286. Make GenerateLoad use InstanceContainers. (breed via mahadev)
 
 NEW FEATURES:
 

Modified: hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses (original)
+++ hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses Mon Feb  2 22:26:03 2009
@@ -3,7 +3,7 @@
 ::Server Commands
 server:org.apache.zookeeper.server.quorum.QuorumPeerMain:Start ZooKeeper server
 ::Test Commands
-generateLoad:org.apache.zookeeper.test.GenerateLoad:A distributed load generator for testing
+generateLoad:org.apache.zookeeper.test.system.GenerateLoad:A distributed load generator for testing
 quorumBench:org.apache.zookeeper.server.QuorumBenchmark:A benchmark of just the quorum protocol
 abBench:org.apache.zookeeper.server.quorum.AtomicBroadcastBenchmark:A benchmark of just the atomic broadcast
 ic:org.apache.zookeeper.test.system.InstanceContainer:A container that will instantiate classes as directed by an instance manager

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Mon Feb  2 22:26:03 2009
@@ -376,7 +376,9 @@
                         stats.packetsSent++;
                         /* We've sent the whole buffer, so drop the buffer */
                         sent -= bb.remaining();
-                        zk.serverStats().incrementPacketsSent();
+                        if (zk != null) {
+                            zk.serverStats().incrementPacketsSent();
+                        }
                         outgoingBuffers.remove();
                     }
                     // ZooLog.logTraceMessage(LOG,
@@ -589,6 +591,7 @@
                     sb.append(zk.dataTree.dumpEphemerals()).append("\n");
                     sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
                 }
+                sendBuffer(NIOServerCnxn.closeConn);
                 k.interestOps(SelectionKey.OP_WRITE);
                 return;
             } else if (len == reqsCmd) {
@@ -603,6 +606,7 @@
                     }
                 }
                 sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+                sendBuffer(NIOServerCnxn.closeConn);
                 k.interestOps(SelectionKey.OP_WRITE);
                 return;
             } else if (len == statCmd) {
@@ -626,6 +630,7 @@
                     sb.append("ZooKeeperServer not running\n");
 
                 sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+                sendBuffer(NIOServerCnxn.closeConn);
                 k.interestOps(SelectionKey.OP_WRITE);
                 return;
             } else if (len == enviCmd) {
@@ -642,6 +647,7 @@
                 }
 
                 sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+                sendBuffer(NIOServerCnxn.closeConn);
                 k.interestOps(SelectionKey.OP_WRITE);
                 return;
             } else if (len == srstCmd) {
@@ -650,6 +656,7 @@
                 zk.serverStats().reset();
 
                 sendBuffer(ByteBuffer.wrap("Stats reset.\n".getBytes()));
+                sendBuffer(NIOServerCnxn.closeConn);
                 k.interestOps(SelectionKey.OP_WRITE);
                 return;
             }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerStats.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerStats.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerStats.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerStats.java Mon Feb  2 22:26:03 2009
@@ -35,6 +35,7 @@
     public interface Provider {
         public long getOutstandingRequests();
         public long getLastProcessedZxid();
+        public String getState();
     }
     
     public ServerStats(Provider provider) {
@@ -74,7 +75,7 @@
     }
 
     public String getServerState() {
-        return "standalone";
+        return provider.getState();
     }
     
     @Override

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Mon Feb  2 22:26:03 2009
@@ -56,7 +56,7 @@
 
     public SyncRequestProcessor(ZooKeeperServer zks,
             RequestProcessor nextProcessor) {
-        super("SyncThread:" + zks.getClientPort());
+        super("SyncThread:" + zks.getServerId());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
         start();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Mon Feb  2 22:26:03 2009
@@ -119,7 +119,6 @@
     int requestsInProcess;
     List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
     private NIOServerCnxn.Factory serverCnxnFactory;
-    private int clientPort;
 
     private final ServerStats serverStats;
 
@@ -637,13 +636,9 @@
     }
     
     public int getClientPort() {
-        return clientPort;
+        return serverCnxnFactory != null ? serverCnxnFactory.ss.socket().getLocalPort() : -1;
     }
 
-    public void setClientPort(int clientPort) {
-        this.clientPort = clientPort;
-    }
-    
     public void setTxnLogFactory(FileTxnSnapLog txnLog) {
         this.txnLogFactory = txnLog;
     }
@@ -651,4 +646,8 @@
     public FileTxnSnapLog getTxnLogFactory() {
         return this.txnLogFactory;
     }
+
+    public String getState() {
+        return "standalone";
+    }
 }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java Mon Feb  2 22:26:03 2009
@@ -61,7 +61,6 @@
             public ZooKeeperServer createServer() throws IOException {
                 // create a file logger url from the command line args
                 ZooKeeperServer zks = new ZooKeeperServer();
-                zks.setClientPort(ServerConfig.getClientPort());
 
                FileTxnSnapLog ftxn = new FileTxnSnapLog(new 
                        File(ServerConfig.getDataLogDir()),

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Mon Feb  2 22:26:03 2009
@@ -45,7 +45,7 @@
 
     public FollowerRequestProcessor(FollowerZooKeeperServer zks,
             RequestProcessor nextProcessor) {
-        super("FollowerRequestProcessor:" + zks.getClientPort());
+        super("FollowerRequestProcessor:" + zks.getServerId());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
         start();

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java Mon Feb  2 22:26:03 2009
@@ -83,7 +83,7 @@
     protected void setupRequestProcessors() {
         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
         commitProcessor = new CommitProcessor(finalProcessor,
-                Integer.toString(getClientPort()), true);
+                Long.toString(getServerId()), true);
         firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
         syncProcessor = new SyncRequestProcessor(this,
                 new SendAckRequestProcessor(getFollower()));
@@ -229,4 +229,9 @@
         }
         jmxServerBean = null;
     }
+    
+    @Override
+    public String getState() {
+        return "follower";
+    }
 }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Mon Feb  2 22:26:03 2009
@@ -62,7 +62,7 @@
         RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
                 finalProcessor, getLeader().toBeApplied);
         commitProcessor = new CommitProcessor(toBeAppliedProcessor,
-                Integer.toString(getClientPort()), false);
+                Long.toString(getServerId()), false);
         RequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                 commitProcessor);
         firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
@@ -138,4 +138,9 @@
         }
         jmxServerBean = null;
     }
+    
+    @Override
+    public String getState() {
+        return "leader";
+    }
 }

Modified: hadoop/zookeeper/trunk/src/java/systest/README.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/README.txt?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/README.txt (original)
+++ hadoop/zookeeper/trunk/src/java/systest/README.txt Mon Feb  2 22:26:03 2009
@@ -40,3 +40,23 @@
 java -DsysTest.zkHostPort=hostA:2181  -jar build/contrib/fatjar/zookeeper-dev-fatjar.jar systest org.apache.zookeeper.test.system.SimpleSysTest
 
 where hostA is running the zk server started in step 2) above
+
+InstanceContainers can also be used to run a the saturation benchmark. The
+first two steps are the same as the system test. Step 3 is almost the same:
+
+3) start the InstanceContainer on each host:
+
+e.g. java -jar zookeeper-<version>-fatjar.jar ic <name> <zkHostPort> <prefix>
+
+note prefix can be /sysTest or any other path. If you do use /sysTest, make
+sure the system test isn't running when you run the benchmark.
+
+4) run GenerateLoad using the following
+
+java -jar build/contrib/fatjar/zookeeper-dev-fatjar.jar generateLoad <zkHostPort> <prefix> #servers #clients
+
+Once GenerateLoad is started, it will read commands from stdin. Usually
+the only command you need to know is "percentage" which sets the percentage
+of writes to use in the requests. Once a percentage is set, the benchmark
+will start. "percentage 0" will cause only reads to be issued and
+"percentage 100" will cause only writes to be issued.

Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java Mon Feb  2 22:26:03 2009
@@ -23,17 +23,14 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.security.MessageDigest;
 import java.util.HashMap;
 
 import junit.framework.TestCase;
 
-import org.apache.log4j.Logger;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumStats;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.junit.runner.JUnitCore;
 
@@ -121,14 +118,13 @@
         StringBuilder sbServer = new StringBuilder();
         try {
             for(int i = 0; i < count; i++) {
-                String r = QuorumPeerInstance.createServer(im, i);
+                String r[] = QuorumPeerInstance.createServer(im, i);
                 if (i > 0) {
                     sbClient.append(',');
                     sbServer.append(',');
                 }
-                String parts[] = r.split(",");
-                sbClient.append(parts[0]);
-                sbServer.append(parts[1]);
+                sbClient.append(r[0]);
+                sbServer.append(r[1]);
             }
             serverHostPort = sbClient.toString();
             quorumHostPort = sbServer.toString();

Copied: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java (from r740129, hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GenerateLoad.java)
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java?p2=hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java&p1=hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GenerateLoad.java&r1=740129&r2=740131&rev=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GenerateLoad.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java Mon Feb  2 22:26:03 2009
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper.test;
+package org.apache.zookeeper.test.system;
 
 import java.io.BufferedReader;
 import java.io.FileNotFoundException;
@@ -25,8 +25,11 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
@@ -79,7 +82,8 @@
     synchronized static void add(long time, int count, Socket s) {
         long interval = time / INTERVAL;
         if (currentInterval == 0 || currentInterval > interval) {
-            System.out.println("Dropping " + count + " for " + new Date(time) + " " + currentInterval + ">" + interval);
+            System.out.println("Dropping " + count + " for " + new Date(time)
+                    + " " + currentInterval + ">" + interval);
             return;
         }
         // We track totals by seconds
@@ -109,20 +113,22 @@
         public void run() {
             try {
                 System.out.println("Connected to " + s);
-                BufferedReader is = new BufferedReader(new InputStreamReader(s.getInputStream()));
+                BufferedReader is = new BufferedReader(new InputStreamReader(s
+                        .getInputStream()));
                 String result;
                 while ((result = is.readLine()) != null) {
                     String timePercentCount[] = result.split(" ");
                     if (timePercentCount.length != 5) {
-                        System.err.println("Got " + result + " from " + s + " exitng.");
+                        System.err.println("Got " + result + " from " + s
+                                + " exitng.");
                         throw new IOException(result);
                     }
                     long time = Long.parseLong(timePercentCount[0]);
-                    //int percent = Integer.parseInt(timePercentCount[1]);
+                    // int percent = Integer.parseInt(timePercentCount[1]);
                     int count = Integer.parseInt(timePercentCount[2]);
                     int errs = Integer.parseInt(timePercentCount[3]);
                     if (errs > 0) {
-                        System.out.println(s+" Got an error! " + errs);
+                        System.out.println(s + " Got an error! " + errs);
                     }
                     add(time, count, s);
                 }
@@ -162,6 +168,7 @@
             try {
                 while (true) {
                     Socket s = ss.accept();
+                    System.err.println("Accepted connection from " + s);
                     slaves.add(new SlaveThread(s));
                 }
             } catch (IOException e) {
@@ -177,6 +184,8 @@
     }
 
     static class ReporterThread extends Thread {
+        static int percentage;
+
         ReporterThread() {
             setDaemon(true);
             start();
@@ -186,7 +195,7 @@
             try {
                 currentInterval = System.currentTimeMillis() / INTERVAL;
                 // Give things time to report;
-                Thread.sleep(INTERVAL*2);
+                Thread.sleep(INTERVAL * 2);
                 long min = 99999;
                 long max = 0;
                 long total = 0;
@@ -196,8 +205,10 @@
                     long lastInterval = currentInterval;
                     currentInterval += 1;
                     long count = remove(lastInterval);
-                    count=count*1000/INTERVAL; // Multiply by 1000 to get reqs/sec
-                    if (lastChange != 0 && (lastChange + INTERVAL*4 + 5000)< now) {
+                    count = count * 1000 / INTERVAL; // Multiply by 1000 to get
+                                                     // reqs/sec
+                    if (lastChange != 0
+                            && (lastChange + INTERVAL * 4 + 5000) < now) {
                         // We only want to print anything if things have had a
                         // chance to change
 
@@ -211,17 +222,13 @@
                         number++;
                         Calendar calendar = Calendar.getInstance();
                         calendar.setTimeInMillis(lastInterval * INTERVAL);
-                        String report = lastInterval + " " + calendar.get(Calendar.HOUR_OF_DAY)
-                                                           + ":" + calendar.get(Calendar.MINUTE)
-                                                           + ":" + calendar.get(Calendar.SECOND)
-                                + " "
-                                + percentage
-                                + "% "
-                                + count
-                                + " "
-                                + min
-                                + " "
-                                + ((double)total / (double)number) + " " + max;
+                        String report = lastInterval + " "
+                                + calendar.get(Calendar.HOUR_OF_DAY) + ":"
+                                + calendar.get(Calendar.MINUTE) + ":"
+                                + calendar.get(Calendar.SECOND) + " "
+                                + percentage + "% " + count + " " + min + " "
+                                + ((double) total / (double) number) + " "
+                                + max;
                         System.err.println(report);
                         if (sf != null) {
                             sf.println(report);
@@ -243,7 +250,7 @@
     synchronized static void sendChange(int percentage) {
         long now = System.currentTimeMillis();
         long start = now;
-        GenerateLoad.percentage = percentage;
+        ReporterThread.percentage = percentage;
         for (SlaveThread st : slaves.toArray(new SlaveThread[0])) {
             st.send(percentage);
         }
@@ -255,214 +262,408 @@
         lastChange = now;
     }
 
-    static int percentage = -1;
+    static public class GeneratorInstance implements Instance {
 
-    static String host;
+        int percentage = -1;
 
-        static Socket s;
+        int errors;
 
-        static int errors;
+        final Object statSync = new Object();
 
-        static final Object statSync = new Object();
+        int finished;
 
-        static int finished;
+        int reads;
 
-        static int reads;
+        int writes;
 
-        static int writes;
+        int rlatency;
 
-        static int rlatency;
+        int wlatency;
 
-        static int wlatency;
+        int outstanding;
+        
+        volatile boolean alive;
 
-        static int outstanding;
+        class ZooKeeperThread extends Thread implements Watcher, DataCallback,
+                StatCallback {
+            String host;
 
-    static class ZooKeeperThread extends Thread implements Watcher, DataCallback,
-            StatCallback {
-        ZooKeeperThread() {
-            setDaemon(true);
-            start();
-            alive = true;
-        }
+            ZooKeeperThread(String host) {
+                setDaemon(true);
+                alive = true;
+                this.host = host;
+                start();
+            }
 
-        static final int outstandingLimit = 100;
+            static final int outstandingLimit = 100;
 
-        synchronized void incOutstanding() throws InterruptedException {
-            outstanding++;
-            while (outstanding > outstandingLimit) {
-                wait();
+            synchronized void incOutstanding() throws InterruptedException {
+                outstanding++;
+                while (outstanding > outstandingLimit) {
+                    wait();
+                }
             }
-        }
 
-        synchronized void decOutstanding() {
-            outstanding--;
-            notifyAll();
-        }
+            synchronized void decOutstanding() {
+                outstanding--;
+                notifyAll();
+            }
 
-        boolean alive;
+            Random r = new Random();
 
-        Random r = new Random();
+            String path;
 
-        String path;
+            ZooKeeper zk;
 
-        ZooKeeper zk;
+            boolean connected;
 
-        public void run() {
-            try {
-                byte bytes[] = new byte[1024];
-                zk = new ZooKeeper(host, 60000, this);
-                for(int i = 0; i < 300; i++) {
+            public void run() {
+                try {
+                    byte bytes[] = new byte[1024];
+                    zk = new ZooKeeper(host, 60000, this);
+                    synchronized (this) {
+                        if (!connected) {
+                            wait(20000);
+                        }
+                    }
+                    for (int i = 0; i < 300; i++) {
+                        try {
+                            Thread.sleep(100);
+                            path = zk.create("/client", new byte[16],
+                                    Ids.OPEN_ACL_UNSAFE,
+                                    CreateMode.EPHEMERAL_SEQUENTIAL);
+                            break;
+                        } catch (KeeperException e) {
+                            LOG.error("keeper exception thrown", e);
+                        }
+                    }
+                    if (path == null) {
+                        System.err.println("Couldn't create a node in /!");
+                        return;
+                    }
+                    while (alive) {
+                        if (r.nextInt(100) < percentage) {
+                            zk.setData(path, bytes, -1, this, System
+                                    .currentTimeMillis());
+                        } else {
+                            zk.getData(path, false, this, System
+                                    .currentTimeMillis());
+                        }
+                        incOutstanding();
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    alive = false;
                     try {
-                        Thread.sleep(100);
-                        path = zk.create("/client", new byte[16], Ids.OPEN_ACL_UNSAFE,
-                                CreateMode.EPHEMERAL_SEQUENTIAL);
-                        break;
-                    } catch(KeeperException e) {
-                        LOG.error("keeper exception thrown", e);
-                    }
-                }
-                if (path == null) {
-                    System.err.println("Couldn't create a node in /!");
-                    System.exit(44);
-                }
-                System.err.println("Created: " + s);
-                while (alive) {
-                    if (r.nextInt(100) < percentage) {
-                        zk.setData(path, bytes, -1, this, System.currentTimeMillis());
+                        zk.close();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+            public void process(WatchedEvent event) {
+                System.err.println(event);
+                synchronized (this) {
+                    if (event.getType() == EventType.None) {
+                        connected = (event.getState() == KeeperState.SyncConnected);
+                        notifyAll();
+                    }
+                }
+            }
+
+            public void processResult(int rc, String path, Object ctx, byte[] data,
+                    Stat stat) {
+                decOutstanding();
+                synchronized (statSync) {
+                    if (!alive) {
+                        return;
+                    }
+                    if (rc != 0) {
+                        System.err.println("Got rc = " + rc);
+                        errors++;
                     } else {
-                        zk.getData(path, false, this, System.currentTimeMillis());
+                        finished++;
+                        rlatency += System.currentTimeMillis() - (Long) ctx;
+                        reads++;
                     }
-                    incOutstanding();
                 }
-            } catch (Exception e) {
-                e.printStackTrace();
-                System.exit(3);
-            } finally {
-                alive = false;
             }
-        }
 
-        public void process(WatchedEvent event) {
-            System.err.println(event);
-            synchronized(this) {
-                try {
-                    wait(200);
-                } catch (InterruptedException e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
+            public void processResult(int rc, String path, Object ctx, Stat stat) {
+                decOutstanding();
+                synchronized (statSync) {
+                    if (rc != 0) {
+                        System.err.println("Got rc = " + rc);
+                        errors++;
+                    } else {
+                        finished++;
+                        wlatency += System.currentTimeMillis() - (Long) ctx;
+                        writes++;
+                    }
                 }
             }
-            if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
+        }
+
+        class SenderThread extends Thread {
+            Socket s;
+
+            SenderThread(Socket s) {
+                this.s = s;
+                setDaemon(true);
+                start();
+            }
+
+            public void run() {
                 try {
-                    zk = new ZooKeeper(host, 10000, this);
-                } catch (IOException e) {
+                    OutputStream os = s.getOutputStream();
+                    finished = 0;
+                    errors = 0;
+                    while (alive) {
+                        Thread.sleep(300);
+                        if (percentage == -1 || (finished == 0 && errors == 0)) {
+                            continue;
+                        }
+                        String report = System.currentTimeMillis() + " "
+                                + percentage + " " + finished + " " + errors + " "
+                                + outstanding + "\n";
+                       /* String subreport = reads + " "
+                                + (((double) rlatency) / reads) + " " + writes
+                                + " " + (((double) wlatency / writes)); */
+                        synchronized (statSync) {
+                            finished = 0;
+                            errors = 0;
+                            reads = 0;
+                            writes = 0;
+                            rlatency = 0;
+                            wlatency = 0;
+                        }
+                        os.write(report.getBytes());
+                        //System.out.println("Reporting " + report + "+" + subreport);
+                    }
+                } catch (Exception e) {
                     e.printStackTrace();
                 }
+
             }
         }
 
-        public void processResult(int rc, String path, Object ctx, byte[] data,
-                Stat stat) {
-            decOutstanding();
-            synchronized(statSync) {
-            if (rc != 0) {
-                System.err.println("Got rc = " + rc);
-                errors++;
-            } else {
-                finished++;
-                rlatency += System.currentTimeMillis() - (Long)ctx;
-                reads++;
-            }
-            }
+        Socket s;
+        ZooKeeperThread zkThread;
+        SenderThread sendThread;
+        Reporter r;
+
+        public void configure(final String params) {
+            System.err.println("Got " + params);
+            new Thread() {
+                public void run() {
+                    try {
+                        String parts[] = params.split(" ");
+                        String hostPort[] = parts[1].split(":");
+                        s = new Socket(hostPort[0], Integer.parseInt(hostPort[1]));
+                        zkThread = new ZooKeeperThread(parts[0]);
+                        sendThread = new SenderThread(s);
+                        BufferedReader is = new BufferedReader(new InputStreamReader(s
+                                .getInputStream()));
+                        String line;
+                        while ((line = is.readLine()) != null) {
+                            percentage = Integer.parseInt(line);
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }.start();
 
         }
 
-        public void processResult(int rc, String path, Object ctx, Stat stat) {
-            decOutstanding();
-            synchronized(statSync) {
-            if (rc != 0) {
-                System.err.println("Got rc = " + rc);
-                errors++;
-            } else {
-                finished++;
-                wlatency += System.currentTimeMillis() - (Long)ctx;
-                writes++;
-            }
-            }
+        public void setReporter(Reporter r) {
+            this.r = r;
         }
-    }
 
-    static class SenderThread extends Thread {
-        SenderThread() {
-            setDaemon(true);
-            start();
+        public void start() {
+            try {
+                r.report("started");
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
         }
-        public void run() {
+
+        public void stop() {
+            alive = false;
+            zkThread.interrupt();
+            sendThread.interrupt();
             try {
-                OutputStream os = s.getOutputStream();
-                finished = 0;
-                errors = 0;
-                while(true) {
-                    Thread.sleep(300);
-                    if (percentage == -1 || (finished == 0 && errors == 0)) {
-                        continue;
-                    }
-                    String report = System.currentTimeMillis() + " " + percentage + " " + finished + " " + errors + " " + outstanding + "\n";
-                    String subreport = reads + " " + (((double)rlatency)/reads) + " " + writes + " " + (((double)wlatency/writes));
-                    synchronized(statSync) {
-                    finished = 0;
-                    errors = 0;
-                    reads = 0;
-                    writes = 0;
-                    rlatency = 0;
-                    wlatency = 0;
-                    }
-                    os.write(report.getBytes());
-                    System.out.println("Reporting " + report + "+" + subreport);
-                }
+                zkThread.join();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            try {
+                sendThread.join();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            try {
+                r.report("stopped");
             } catch (Exception e) {
                 e.printStackTrace();
             }
+            try {
+                s.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    private static class StatusWatcher implements Watcher {
+        volatile boolean connected;
 
+        public void process(WatchedEvent event) {
+            if (event.getType() == Watcher.Event.EventType.None) {
+                synchronized (this) {
+                    connected = event.getState() == Watcher.Event.KeeperState.SyncConnected;
+                    notifyAll();
+                }
+            }
+        }
+
+        public boolean isConnected() {
+            return connected;
+        }
+
+        synchronized public boolean waitConnected(long timeout)
+                throws InterruptedException {
+            long endTime = System.currentTimeMillis() + timeout;
+            while (!connected && System.currentTimeMillis() < endTime) {
+                wait(endTime - System.currentTimeMillis());
+            }
+            return connected;
         }
     }
 
+    private static boolean leaderOnly;
+    
+    private static String []processOptions(String args[]) {
+        ArrayList<String> newArgs = new ArrayList<String>();
+        for(String a: args) {
+            if (a.equals("--leaderOnly")) {
+                leaderOnly = true;
+            } else {
+                newArgs.add(a);
+            }
+        }
+        return newArgs.toArray(new String[0]);
+    }
+    
     /**
      * @param args
      * @throws InterruptedException
+     * @throws KeeperException
+     * @throws DuplicateNameException
+     * @throws NoAvailableContainers
+     * @throws NoAssignmentException
      */
-    public static void main(String[] args) throws InterruptedException {
-        if (args.length == 1) {
+    public static void main(String[] args) throws InterruptedException,
+            KeeperException, NoAvailableContainers, DuplicateNameException,
+            NoAssignmentException {
+
+        args = processOptions(args);
+        if (args.length == 4) {
             try {
-                int port = Integer.parseInt(args[0]);
-                ss = new ServerSocket(port);
+                StatusWatcher statusWatcher = new StatusWatcher();
+                ZooKeeper zk = new ZooKeeper(args[0], 15000, statusWatcher);
+                if (!statusWatcher.waitConnected(5000)) {
+                    System.err.println("Could not connect to " + args[0]);
+                    return;
+                }
+                InstanceManager im = new InstanceManager(zk, args[1]);
+                ss = new ServerSocket(0);
+                int port = ss.getLocalPort();
+                int serverCount = Integer.parseInt(args[2]);
+                int clientCount = Integer.parseInt(args[3]);
+                StringBuilder quorumHostPort = new StringBuilder();
+                StringBuilder zkHostPort = new StringBuilder();
+                for (int i = 0; i < serverCount; i++) {
+                    String r[] = QuorumPeerInstance.createServer(im, i);
+                    if (i > 0) {
+                        quorumHostPort.append(',');
+                        zkHostPort.append(',');
+                    }
+                    zkHostPort.append(r[0]);
+                    quorumHostPort.append(r[1]);
+                }
+                for (int i = 0; i < serverCount; i++) {
+                    QuorumPeerInstance.startInstance(im, quorumHostPort
+                            .toString(), i);
+                }
+                if (leaderOnly) {
+                    int tries = 0;
+                    outer:
+                        while(true) {
+                            Thread.sleep(1000);
+                            IOException lastException = null;
+                            String parts[] = zkHostPort.toString().split(",");
+                            for(int i = 0; i < parts.length; i++) {
+                                try {
+                                    String mode = getMode(parts[i]);
+                                    if (mode.equals("leader")) {
+                                        zkHostPort = new StringBuilder(parts[i]);
+                                        System.out.println("Connecting exclusively to " + zkHostPort.toString());
+                                        break outer;
+                                    }
+                                } catch(IOException e) {
+                                    lastException = e;
+                                }
+                            }
+                            if (tries++ > 3) {
+                                throw lastException;
+                            }
+                        }
+                }
+                for (int i = 0; i < clientCount; i++) {
+                    im.assignInstance("client" + i, GeneratorInstance.class,
+                            zkHostPort.toString()
+                                    + ' '
+                                    + InetAddress.getLocalHost()
+                                            .getCanonicalHostName() + ':'
+                                    + port, 1);
+                }
                 new AcceptorThread();
                 new ReporterThread();
-                BufferedReader is = new BufferedReader(new InputStreamReader(System.in));
+                BufferedReader is = new BufferedReader(new InputStreamReader(
+                        System.in));
                 String line;
                 while ((line = is.readLine()) != null) {
                     try {
                         String cmdNumber[] = line.split(" ");
-                        if (cmdNumber[0].equals("percentage") && cmdNumber.length > 1) {
+                        if (cmdNumber[0].equals("percentage")
+                                && cmdNumber.length > 1) {
                             int number = Integer.parseInt(cmdNumber[1]);
                             if (number < 0 || number > 100) {
-                                throw new NumberFormatException("must be between 0 and 100");
+                                throw new NumberFormatException(
+                                        "must be between 0 and 100");
                             }
                             sendChange(number);
-                        } else if (cmdNumber[0].equals("sleep") && cmdNumber.length > 1) {
+                        } else if (cmdNumber[0].equals("sleep")
+                                && cmdNumber.length > 1) {
                             int number = Integer.parseInt(cmdNumber[1]);
-                            Thread.sleep(number*1000);
-                        } else if (cmdNumber[0].equals("save") && cmdNumber.length > 1) {
+                            Thread.sleep(number * 1000);
+                        } else if (cmdNumber[0].equals("save")
+                                && cmdNumber.length > 1) {
                             sf = new PrintStream(cmdNumber[1]);
                         } else {
                             System.err.println("Commands must be:");
-                            System.err.println("\tpercentage new_write_percentage");
+                            System.err
+                                    .println("\tpercentage new_write_percentage");
                             System.err.println("\tsleep seconds_to_sleep");
                             System.err.println("\tsave file_to_save_output");
                         }
                     } catch (NumberFormatException e) {
-                        System.out
-                                .println("Not a valid number: " + e.getMessage());
+                        System.out.println("Not a valid number: "
+                                + e.getMessage());
                     }
                 }
             } catch (NumberFormatException e) {
@@ -471,35 +672,29 @@
                 e.printStackTrace();
                 System.exit(2);
             }
-        } else if (args.length == 2) {
-            host = args[1];
-            String hostPort[] = args[0].split(":");
-            try {
-                s = new Socket(hostPort[0], Integer
-                        .parseInt(hostPort[1]));
-                new ZooKeeperThread();
-                new SenderThread();
-                BufferedReader is = new BufferedReader(new InputStreamReader(s.getInputStream()));
-                String line;
-                while((line = is.readLine()) != null) {
-                    percentage = Integer.parseInt(line);
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
         } else {
             doUsage();
         }
 
     }
 
+    private static String getMode(String hostPort) throws NumberFormatException, UnknownHostException, IOException {
+        String parts[] = hostPort.split(":");
+        Socket s = new Socket(parts[0], Integer.parseInt(parts[1]));
+        s.getOutputStream().write("stat".getBytes());
+        BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+        String line;
+        while((line = br.readLine()) != null) {
+            if (line.startsWith("Mode: ")) {
+                return line.substring(6);
+            }
+        }
+        return "unknown";
+    }
+
     private static void doUsage() {
-        System.err
-                .println("USAGE: "
-                        + GenerateLoad.class.getName()
-                        + " controller_host:port zookeeper_host:port-> connects to a controller");
         System.err.println("USAGE: " + GenerateLoad.class.getName()
-                + " controller_port -> starts a controller");
+                + " [--leaderOnly] zookeeper_host:port containerPrefix #ofServers #ofClients");
         System.exit(2);
     }
 }

Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java Mon Feb  2 22:26:03 2009
@@ -53,7 +53,6 @@
             this.myNode = myNode;
             this.dc = dc;
         }
-        @Override
         public void process(WatchedEvent event) {
             if (event.getPath() != null && event.getPath().equals(myNode)) {
                 zk.getData(myNode, this, dc, this);
@@ -70,7 +69,6 @@
             this.myInstance = myInstance;
             lastVer = ver;
         }
-        @Override
         public void processResult(int rc, String path,
                 Object ctx, byte[] data, Stat stat) {
             if (rc == KeeperException.Code.NONODE.intValue()) {
@@ -95,7 +93,6 @@
             myReportNode = reportsNode + '/' + child;
         }
 
-        @Override
         public void report(String report) throws KeeperException, InterruptedException {
             for(int j = 0; j < maxTries; j++) {
                 try {
@@ -217,7 +214,6 @@
         }
     }
 
-    @Override
     public void process(WatchedEvent event) {
         if (KeeperState.Expired == event.getState()) {
             // It's all over
@@ -231,7 +227,6 @@
     }
 
     HashMap<String, Instance> instances = new HashMap<String, Instance>();
-    @Override
     public void processResult(int rc, String path, Object ctx,
             List<String> children) {
         if (rc != KeeperException.Code.OK.intValue()) {
@@ -288,6 +283,9 @@
                         zk.getData(myNode, watcher, dc, watcher);
                     } catch (Exception e) {
                         LOG.warn("Skipping " + child, e);
+                        if (e.getCause() != null) {
+                            LOG.warn("Caused by", e.getCause());
+                        }
                     }
                     
                 }

Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java Mon Feb  2 22:26:03 2009
@@ -115,7 +115,7 @@
             zk.create(readyNode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         } catch(NodeExistsException e) { /* this is ok */ } 
     }
-    @Override
+    
     synchronized public void processResult(int rc, String path, Object ctx,
             List<String> children) {
         if (rc != KeeperException.Code.OK.intValue()) {
@@ -156,7 +156,7 @@
             zk.delete(deadNode, -1);
         } catch(NoNodeException e) { /* this is ok */ }
     }
-    @Override
+    
     public void process(WatchedEvent event) {
         if (event.getPath().equals(statusNode)) {
             zk.getChildren(statusNode, this, this, null);
@@ -297,7 +297,6 @@
                 synchronized(eventObj) {
                     // wait for the node to appear
                     Stat eStat = zk.exists(reportsNode + '/' + name, new Watcher() {
-                        @Override
                         public void process(WatchedEvent event) {
                             synchronized(eventObj) {
                                 eventObj.notifyAll();

Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java Mon Feb  2 22:26:03 2009
@@ -19,12 +19,14 @@
 package org.apache.zookeeper.test.system;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.HashMap;
+import java.util.Properties;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
@@ -41,7 +43,6 @@
     Reporter r;
     QuorumPeer peer;
 
-    @Override
     public void setReporter(Reporter r) {
         this.r = r;
     }
@@ -49,19 +50,33 @@
     InetSocketAddress clientAddr;
     InetSocketAddress quorumAddr;
     HashMap<Long, QuorumServer> peers;
-    File dir;
+    File snapDir, logDir;
 
     public QuorumPeerInstance() {
         try {
-            dir = File.createTempFile("test", ".dir");
+            File tmpFile = File.createTempFile("test", ".dir");
+            File tmpDir = tmpFile.getParentFile();
+            tmpFile.delete();
+            File zkDirs = new File(tmpDir, "zktmp.cfg");
+            logDir = tmpDir;
+            snapDir = tmpDir;
+            if (zkDirs.exists()) {
+                Properties p = new Properties();
+                p.load(new FileInputStream(zkDirs));
+                logDir = new File(p.getProperty("logDir", tmpDir.getAbsolutePath()));
+                snapDir = new File(p.getProperty("snapDir", tmpDir.getAbsolutePath()));
+            }
+            logDir = File.createTempFile("zktst", ".dir", logDir);
+            logDir.delete();
+            logDir.mkdir();
+            snapDir = File.createTempFile("zktst", ".dir", snapDir);
+            snapDir.delete();
+            snapDir.mkdir();
         } catch (IOException e) {
             e.printStackTrace();
         }
-        dir.delete();
-        dir.mkdir();
     }
 
-    @Override
     public void configure(String params) {
         if (clientAddr == null) {
             // The first time we are configured, it is just to tell
@@ -140,7 +155,7 @@
                     LOG.warn("Peer " + serverId + " already started");
                     return;
                 }
-                peer = new QuorumPeer(peers, dir, dir, clientAddr.getPort(), 0, serverId, tickTime, initLimit, syncLimit);
+                peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 0, serverId, tickTime, initLimit, syncLimit);
                 peer.start();
                 for(int i = 0; i < 5; i++) {
                     Thread.sleep(500);
@@ -157,7 +172,6 @@
         }
     }
 
-    @Override
     public void start() {
     }
 
@@ -171,7 +185,7 @@
         }
         dir.delete();
     }
-    @Override
+    
     public void stop() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Stopping peer " + serverId);
@@ -179,7 +193,12 @@
         if (peer != null) {
             peer.shutdown();
         }
-        recursiveDelete(dir);
+        if (logDir != null) {
+            recursiveDelete(logDir);
+        }
+        if (snapDir != null) {
+            recursiveDelete(snapDir);
+        }
     }
 
     /**
@@ -192,9 +211,9 @@
      * @throws InterruptedException
      * @throws KeeperException
      */
-    public static String createServer(InstanceManager im, int i) throws NoAvailableContainers, DuplicateNameException, InterruptedException, KeeperException {
+    public static String[] createServer(InstanceManager im, int i) throws NoAvailableContainers, DuplicateNameException, InterruptedException, KeeperException {
         im.assignInstance("server"+i, QuorumPeerInstance.class, Integer.toString(i), 50);
-        return im.getStatus("server"+i, 3000);
+        return im.getStatus("server"+i, 3000).split(",");
         
     }
 

Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java Mon Feb  2 22:26:03 2009
@@ -31,7 +31,7 @@
 import org.apache.zookeeper.data.Stat;
 
 /**
- * The client that gets spawned for the SimpleSysTest
+ * The client that gets spawned for the SimpleSysTest 
  *
  */
 public class SimpleClient implements Instance, Watcher, AsyncCallback.DataCallback, StringCallback, StatCallback {
@@ -42,25 +42,23 @@
     transient String myPath;
     byte data[];
     boolean createdEphemeral;
-    @Override
     public void configure(String params) {
         String parts[] = params.split(" ");
         hostPort = parts[1];
         this.index = Integer.parseInt(parts[0]);
         myPath = "/simpleCase/" + index;
     }
-    @Override
+    
     public void start() {
         try {
             zk = new ZooKeeper(hostPort, 15000, this);
             zk.getData("/simpleCase", true, this, null);
-            r.report("Client " + index + " connecting to " + hostPort);
+            r.report("Client " + index + " connecting to " + hostPort); 
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
-
-    @Override
+    
     public void stop() {
         try {
             if (zk != null) {
@@ -70,13 +68,12 @@
             e.printStackTrace();
         }
     }
-    @Override
     public void process(WatchedEvent event) {
         if (event.getPath() != null && event.getPath().equals("/simpleCase")) {
             zk.getData("/simpleCase", true, this, null);
         }
     }
-    @Override
+    
     public void processResult(int rc, String path, Object ctx, byte[] data,
             Stat stat) {
         if (rc != 0) {
@@ -94,15 +91,14 @@
             } else {
                 zk.setData(myPath, data, -1, this, null);
             }
-        }
+        }            
     }
-    @Override
+    
     public void processResult(int rc, String path, Object ctx, String name) {
         if (rc != 0) {
             zk.create(myPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, this, null);
         }
     }
-    @Override
     public void processResult(int rc, String path, Object ctx, Stat stat) {
         if (rc != 0) {
             zk.setData(myPath, data, -1, this, null);
@@ -112,9 +108,8 @@
     public String toString() {
         return SimpleClient.class.getName() + "[" + index + "] using " + hostPort;
     }
-
+    
     Reporter r;
-    @Override
     public void setReporter(Reporter r) {
         this.r = r;
     }

Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java Mon Feb  2 22:26:03 2009
@@ -154,7 +154,6 @@
         stopServers();
     }
 
-    @Override
     public void process(WatchedEvent event) {
         if (event.getState() == KeeperState.SyncConnected) {
             synchronized(this) {

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java?rev=740131&r1=740130&r2=740131&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java Mon Feb  2 22:26:03 2009
@@ -82,9 +82,8 @@
         assertTrue("exactly 3 snapshots ", (numSnaps == 3));
     }
     
-    @Override
     public void process(WatchedEvent event) {
         // do nothing
     }
     
-}
\ No newline at end of file
+}



Mime
View raw message