Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@minotaur.apache.org Received: (qmail 95252 invoked from network); 20 Feb 2009 00:24:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Feb 2009 00:24:13 -0000 Received: (qmail 60153 invoked by uid 500); 20 Feb 2009 00:24:13 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 60138 invoked by uid 500); 20 Feb 2009 00:24:13 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 60124 invoked by uid 99); 20 Feb 2009 00:24:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Feb 2009 16:24:13 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Feb 2009 00:24:03 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B82982388B36; Fri, 20 Feb 2009 00:23:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r746067 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/persistence/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/main/org/apache/zookeeper/server/util/ src... Date: Fri, 20 Feb 2009 00:23:40 -0000 To: zookeeper-commits@hadoop.apache.org From: mahadev@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090220002341.B82982388B36@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mahadev Date: Fri Feb 20 00:23:40 2009 New Revision: 746067 URL: http://svn.apache.org/viewvc?rev=746067&view=rev Log: ZOOKEEPER-308. improve the atomic broadcast performance 3x. (breed via mahadev) Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Fri Feb 20 00:23:40 2009 @@ -10,6 +10,8 @@ ZOOKEEPER-303. Bin scripts dont work on a Mac. (tom white via mahadev) IMPROVEMENTS: + ZOOKEEPER-308. improve the atomic broadcast performance 3x. (breed via +mahadev) NEW FEATURES: Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Fri Feb 20 00:23:40 2009 @@ -88,7 +88,10 @@ + zks.outstandingChanges.get(0).zxid + " is less than current " + request.zxid); } - zks.outstandingChanges.remove(0); + ZooKeeperServer.ChangeRecord cr = zks.outstandingChanges.remove(0); + if (zks.outstandingChangesForPath.get(cr.path) == cr) { + zks.outstandingChangesForPath.remove(cr.path); + } } if (request.hdr != null) { rc = zks.dataTree.processTxn(request.hdr, request.txn); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Fri Feb 20 00:23:40 2009 @@ -91,6 +91,7 @@ super("NIOServerCxn.Factory:" + port); setDaemon(true); this.ss = ServerSocketChannel.open(); + ss.socket().setReuseAddress(true); ss.socket().bind(new InetSocketAddress(port)); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT); @@ -268,6 +269,19 @@ LinkedList outstanding = new LinkedList(); void sendBuffer(ByteBuffer bb) { + // We check if write interest here because if it is NOT set, nothing is queued, so + // we can try to send the buffer right away without waking up the selector + if ((sk.interestOps()&SelectionKey.OP_WRITE) == 0) { + try { + sock.write(bb); + } catch (IOException e) { + // we are just doing best effort right now + } + } + // if there is nothing left to send, we are done + if (bb.remaining() == 0) { + return; + } synchronized (factory) { sk.selector().wakeup(); if (LOG.isTraceEnabled()) { @@ -471,7 +485,7 @@ outstandingRequests++; // check throttling if (zk.getInProcess() > factory.outstandingLimit) { - LOG.warn("Throttling recv " + zk.getInProcess()); + LOG.debug("Throttling recv " + zk.getInProcess()); disableRecv(); // following lines should not be needed since we are already // reading Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Fri Feb 20 00:23:40 2009 @@ -114,12 +114,15 @@ ChangeRecord getRecordForPath(String path) throws KeeperException.NoNodeException { ChangeRecord lastChange = null; synchronized (zks.outstandingChanges) { + lastChange = zks.outstandingChangesForPath.get(path); + /* for (int i = 0; i < zks.outstandingChanges.size(); i++) { ChangeRecord c = zks.outstandingChanges.get(i); if (c.path.equals(path)) { lastChange = c; } } + */ if (lastChange == null) { DataNode n = zks.dataTree.getNode(path); if (n != null) { @@ -137,6 +140,7 @@ void addChangeRecord(ChangeRecord c) { synchronized (zks.outstandingChanges) { zks.outstandingChanges.add(c); + zks.outstandingChangesForPath.put(c.path, c); } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Fri Feb 20 00:23:40 2009 @@ -18,6 +18,7 @@ package org.apache.zookeeper.server; +import java.io.Flushable; import java.io.IOException; import java.util.LinkedList; import java.util.Random; @@ -65,6 +66,7 @@ @Override public void run() { try { + int randRoll = r.nextInt(snapCount/2); while (true) { Request si = null; if (toFlush.isEmpty()) { @@ -82,8 +84,8 @@ if (si != null) { zks.getLogWriter().append(si); logCount++; - if (logCount > snapCount / 2 - && r.nextInt(snapCount / 2) == 0) { + if (logCount > (snapCount / 2 + randRoll)) { + randRoll = r.nextInt(snapCount/2); // roll the log zks.getLogWriter().rollLog(); // take a snapshot @@ -126,6 +128,9 @@ Request i = toFlush.remove(); nextProcessor.processRequest(i); } + if (nextProcessor instanceof Flushable) { + ((Flushable)nextProcessor).flush(); + } } public void shutdown() { Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Fri Feb 20 00:23:40 2009 @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Random; @@ -118,6 +119,9 @@ int requestsInProcess; List outstandingChanges = new ArrayList(); + // this data structure must be accessed under the outstandingChanges lock + HashMap outstandingChangesForPath = new HashMap(); + private NIOServerCnxn.Factory serverCnxnFactory; private final ServerStats serverStats; @@ -571,7 +575,7 @@ try { return Integer.parseInt(sc); } catch (Exception e) { - return 10000; + return 100000; } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java Fri Feb 20 00:23:40 2009 @@ -18,6 +18,7 @@ package org.apache.zookeeper.server.persistence; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.File; @@ -49,8 +50,10 @@ */ public class FileTxnLog implements TxnLog { long lastZxidSeen; - volatile FileOutputStream logStream = null; + volatile BufferedOutputStream logStream = null; volatile OutputArchive oa; + volatile FileOutputStream fos = null; + File logDir; public final static int TXNLOG_MAGIC = @@ -104,10 +107,14 @@ /** * rollover the current log file to a new one. + * @throws IOException */ - public void rollLog() { - this.logStream = null; - oa = null; + public void rollLog() throws IOException { + if (logStream != null) { + this.logStream.flush(); + this.logStream = null; + oa = null; + } } /** @@ -123,17 +130,18 @@ + " is <= " + lastZxidSeen + " for " + hdr.getType()); } - if (logStream==null) { + if (logStream==null) { logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); - logStream=new FileOutputStream(logFileWrite); + fos = new FileOutputStream(logFileWrite); + logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); - currentSize = logStream.getChannel().position(); - streamsToFlush.add(logStream); + currentSize = fos.getChannel().position(); + streamsToFlush.add(fos); } - padFile(logStream); + padFile(fos); byte[] buf = Util.marshallTxnEntry(hdr, txn); if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + @@ -229,6 +237,9 @@ * disk */ public synchronized void commit() throws IOException { + if (logStream != null) { + logStream.flush(); + } for (FileOutputStream log : streamsToFlush) { log.flush(); if (forceSync) { Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java Fri Feb 20 00:23:40 2009 @@ -268,8 +268,9 @@ /** * roll the transaction logs + * @throws IOException */ - public void rollLog() { + public void rollLog() throws IOException { txnLog.rollLog(); } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java Fri Feb 20 00:23:40 2009 @@ -32,8 +32,9 @@ /** * roll the current * log being appended to + * @throws IOException */ - void rollLog(); + void rollLog() throws IOException; /** * Append a request to the transaction log * @param hdr the transaction header Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Fri Feb 20 00:23:40 2009 @@ -180,7 +180,9 @@ queuedRequests.clear(); notifyAll(); } - nextProcessor.shutdown(); + if (nextProcessor != null) { + nextProcessor.shutdown(); + } } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Fri Feb 20 00:23:40 2009 @@ -43,6 +43,7 @@ import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.txn.SetDataTxn; import org.apache.zookeeper.txn.TxnHeader; /** @@ -51,6 +52,11 @@ public class Follower { private static final Logger LOG = Logger.getLogger(Follower.class); + static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); + static { + LOG.info("TCP NoDelay set to: " + nodelay); + } + QuorumPeer self; FollowerZooKeeperServer zk; @@ -85,15 +91,14 @@ * the proposal packet to be sent to the leader * @throws IOException */ - void writePacket(QuorumPacket pp) throws IOException { - long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; - if (pp.getType() == Leader.PING) { - traceMask = ZooTrace.SERVER_PING_TRACE_MASK; - } - ZooTrace.logQuorumPacket(LOG, traceMask, 'o', pp); + void writePacket(QuorumPacket pp, boolean flush) throws IOException { synchronized (leaderOs) { - leaderOs.writeRecord(pp, "packet"); - bufferedOutput.flush(); + if (pp != null) { + leaderOs.writeRecord(pp, "packet"); + } + if (flush) { + bufferedOutput.flush(); + } } } @@ -147,7 +152,7 @@ //sock = new Socket(); //sock.setSoTimeout(self.tickTime * self.initLimit); sock.connect(addr, self.tickTime * self.syncLimit); - sock.setTcpNoDelay(true); + sock.setTcpNoDelay(nodelay); break; } catch (ConnectException e) { if (tries == 4) { @@ -169,7 +174,7 @@ qp.setType(Leader.LASTZXID); long sentLastZxid = self.getLastLoggedZxid(); qp.setZxid(sentLastZxid); - writePacket(qp); + writePacket(qp, true); readPacket(qp); long newLeaderZxid = qp.getZxid(); @@ -214,7 +219,7 @@ zk.dataTree.lastProcessedZxid = newLeaderZxid; } ack.setZxid(newLeaderZxid & ~0xffffffffL); - writePacket(ack); + writePacket(ack, true); sock.setSoTimeout(self.tickTime * self.syncLimit); zk.startup(); while (self.running) { @@ -231,7 +236,7 @@ dos.writeInt(entry.getValue()); } qp.setData(bos.toByteArray()); - writePacket(qp); + writePacket(qp, true); break; case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); @@ -334,7 +339,7 @@ ZooTrace.SESSION_TRACE_MASK, "To validate session 0x" + Long.toHexString(clientId)); - writePacket(qp); + writePacket(qp, true); } /** @@ -370,7 +375,7 @@ // qp = new QuorumPacket(Leader.REQUEST, -1, baos // .toByteArray(), request.authInfo); // } - writePacket(qp); + writePacket(qp, true); } public long getZxid() { Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java Fri Feb 20 00:23:40 2009 @@ -97,20 +97,23 @@ private void sendPackets() throws InterruptedException { long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; while (true) { - QuorumPacket p; - p = queuedPackets.take(); - - if (p == proposalOfDeath) { - // Packet of death! - break; - } - if (p.getType() == Leader.PING) { - traceMask = ZooTrace.SERVER_PING_TRACE_MASK; - } - ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p); try { + QuorumPacket p; + p = queuedPackets.poll(); + if (p == null) { + bufferedOutput.flush(); + p = queuedPackets.take(); + } + + if (p == proposalOfDeath) { + // Packet of death! + break; + } + if (p.getType() == Leader.PING) { + traceMask = ZooTrace.SERVER_PING_TRACE_MASK; + } + ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p); oa.writeRecord(p, "packet"); - bufferedOutput.flush(); } catch (IOException e) { if (!sock.isClosed()) { LOG.warn("Unexpected exception",e); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Fri Feb 20 00:23:40 2009 @@ -26,11 +26,14 @@ import java.net.SocketAddress; import java.net.SocketException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import org.apache.jute.BinaryOutputArchive; import org.apache.log4j.Logger; @@ -43,6 +46,11 @@ */ public class Leader { private static final Logger LOG = Logger.getLogger(Leader.class); + + static final private boolean nodelay = System.getProperty("leader.nodelay", "true").equals("true"); + static { + LOG.info("TCP NoDelay set to: " + nodelay); + } static public class Proposal { public QuorumPacket packet; @@ -191,7 +199,7 @@ */ final static int SYNC = 7; - private ConcurrentLinkedQueue outstandingProposals = new ConcurrentLinkedQueue(); + private ConcurrentMap outstandingProposals = new ConcurrentHashMap(); ConcurrentLinkedQueue toBeApplied = new ConcurrentLinkedQueue(); @@ -207,7 +215,7 @@ try{ Socket s = ss.accept(); s.setSoTimeout(self.tickTime * self.syncLimit); - s.setTcpNoDelay(true); + s.setTcpNoDelay(nodelay); new FollowerHandler(s, Leader.this); } catch (SocketException e) { if (stop) { @@ -257,7 +265,7 @@ LOG.info("NEWLEADER proposal has Zxid of " + Long.toHexString(newLeaderProposal.packet.getZxid())); } - outstandingProposals.add(newLeaderProposal); + outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal); // Start thread that waits for connection requests from // new followers. @@ -341,7 +349,9 @@ LOG.info("Shutdown called", new Exception("shutdown Leader! reason: " + reason)); - cnxAcceptor.halt(); + if (cnxAcceptor != null) { + cnxAcceptor.halt(); + } // NIO should not accept conenctions self.cnxnFactory.setZooKeeperServer(null); @@ -380,7 +390,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("Ack zxid: 0x" + Long.toHexString(zxid)); - for (Proposal p : outstandingProposals) { + for (Proposal p : outstandingProposals.values()) { long packetZxid = p.packet.getZxid(); LOG.debug("outstanding proposal: 0x" + Long.toHexString(packetZxid)); @@ -394,57 +404,56 @@ } return; } - if (outstandingProposals.peek().packet.getZxid() > zxid) { + if (lastCommitted >= zxid) { if (LOG.isDebugEnabled()) { LOG.debug("proposal has already been committed, pzxid:" - + outstandingProposals.peek().packet.getZxid() - + " zxid:" + zxid); + + lastCommitted + + " zxid: 0x" + Long.toHexString(zxid)); } // The proposal has already been committed return; } - for (Proposal p : outstandingProposals) { - long packetZxid = p.packet.getZxid(); - if (packetZxid == zxid) { - p.ackCount++; - if (LOG.isDebugEnabled()) { - LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid) - + " is " + p.ackCount); - } + Proposal p = outstandingProposals.get(zxid); + if (p == null) { + LOG.warn("Trying to commit future proposal: zxid 0x" + + Long.toHexString(zxid) + " from " + followerAddr); + return; + } + p.ackCount++; + if (LOG.isDebugEnabled()) { + LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid) + + " is " + p.ackCount); + } - if (p.ackCount > self.quorumPeers.size() / 2){ - if (!first) { - LOG.fatal("Commiting zxid 0x" + Long.toHexString(zxid) - + " from " + followerAddr + " not first!"); - LOG.fatal("First is " - + outstandingProposals.element().packet); - System.exit(13); - } - outstandingProposals.remove(); - if (p.request != null) { - toBeApplied.add(p); - } - // We don't commit the new leader proposal - if ((zxid & 0xffffffffL) != 0) { - if (p.request == null) { - LOG.warn("Going to commmit null: " + p); - } - commit(zxid); - zk.commitProcessor.commit(p.request); - if(pendingSyncs.containsKey(zxid)){ - for(FollowerSyncRequest r: pendingSyncs.remove(zxid)) { - sendSync(r); - } - } + if (p.ackCount > self.quorumPeers.size() / 2){ + if (zxid != lastCommitted+1) { + LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid) + + " from " + followerAddr + " not first!"); + LOG.warn("First is " + + (lastCommitted+1)); + //System.exit(13); + } + outstandingProposals.remove(zxid); + if (p.request != null) { + toBeApplied.add(p); + } + // We don't commit the new leader proposal + if ((zxid & 0xffffffffL) != 0) { + if (p.request == null) { + LOG.warn("Going to commmit null: " + p); } + commit(zxid); + zk.commitProcessor.commit(p.request); + if(pendingSyncs.containsKey(zxid)){ + for(FollowerSyncRequest r: pendingSyncs.remove(zxid)) { + sendSync(r); + } } return; } else { - first = false; + lastCommitted = zxid; } } - LOG.warn("Trying to commit future proposal: zxid 0x" - + Long.toHexString(zxid) + " from " + followerAddr); } static class ToBeAppliedRequestProcessor implements RequestProcessor { @@ -514,7 +523,7 @@ } } - long lastCommitted; + long lastCommitted = -1; /** * Create a commit packet and send it to all the members of the quorum @@ -537,7 +546,6 @@ */ public Proposal propose(Request request) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); try { @@ -549,8 +557,8 @@ } catch (IOException e) { LOG.warn("This really should be impossible", e); } - QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos - .toByteArray(), null); + QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, + baos.toByteArray(), null); Proposal p = new Proposal(); p.packet = pp; @@ -560,8 +568,8 @@ LOG.debug("Proposing:: " + request); } - outstandingProposals.add(p); lastProposed = p.packet.getZxid(); + outstandingProposals.put(lastProposed, p); sendPacket(pp); } return p; @@ -622,11 +630,13 @@ .getZxid(), null, null); handler.queuePacket(qp); } - for (Proposal p : outstandingProposals) { - if (p.packet.getZxid() <= lastSeenZxid) { + Listzxids = new ArrayList(outstandingProposals.keySet()); + Collections.sort(zxids); + for (Long zxid: zxids) { + if (zxid <= lastSeenZxid) { continue; } - handler.queuePacket(p.packet); + handler.queuePacket(outstandingProposals.get(zxid).packet); } } synchronized (forwardingFollowers) { Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java Fri Feb 20 00:23:40 2009 @@ -18,6 +18,7 @@ package org.apache.zookeeper.server.quorum; +import java.io.Flushable; import java.io.IOException; import org.apache.log4j.Logger; @@ -26,7 +27,7 @@ import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; -public class SendAckRequestProcessor implements RequestProcessor { +public class SendAckRequestProcessor implements RequestProcessor, Flushable { private static final Logger LOG = Logger.getLogger(SendAckRequestProcessor.class); Follower follower; @@ -40,12 +41,16 @@ QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null, null); try { - follower.writePacket(qp); + follower.writePacket(qp, false); } catch (IOException e) { LOG.warn("Ignoring unexpected exception during packet send", e); } } } + + public void flush() throws IOException { + follower.writePacket(null, true); + } public void shutdown() { // Nothing needed Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java Fri Feb 20 00:23:40 2009 @@ -78,17 +78,25 @@ public static void deserializeSnapshot(DataTree dt,InputArchive ia, Map sessions) throws IOException { - int count = ia.readInt("count"); - while (count > 0) { - long id = ia.readLong("id"); - int to = ia.readInt("timeout"); - sessions.put(id, to); - ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, - "loadData --- session in archive: " + id - + " with timeout: " + to); - count--; + try { + int count = ia.readInt("count"); + while (count > 0) { + long id = ia.readLong("id"); + int to = ia.readInt("timeout"); + sessions.put(id, to); + ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, + "loadData --- session in archive: " + id + + " with timeout: " + to); + count--; + } + dt.deserialize(ia, "tree"); + } catch(IOException e) { + throw e; + } catch(Exception e) { + IOException ioe = new IOException(e.getMessage()); + ioe.initCause(e); + throw ioe; } - dt.deserialize(ia, "tree"); } public static void serializeSnapshot(DataTree dt,OutputArchive oa, Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java (original) +++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java Fri Feb 20 00:23:40 2009 @@ -63,10 +63,10 @@ static Map totalByTime = new HashMap(); - static long currentInterval; + volatile static long currentInterval; static long lastChange; - + static PrintStream sf; static PrintStream tf; static { @@ -208,7 +208,7 @@ count = count * 1000 / INTERVAL; // Multiply by 1000 to get // reqs/sec if (lastChange != 0 - && (lastChange + INTERVAL * 4 + 5000) < now) { + && (lastChange + INTERVAL * 3) < now) { // We only want to print anything if things have had a // chance to change @@ -264,6 +264,8 @@ static public class GeneratorInstance implements Instance { + byte bytes[]; + int percentage = -1; int errors; @@ -319,7 +321,6 @@ public void run() { try { - byte bytes[] = new byte[1024]; zk = new ZooKeeper(host, 60000, this); synchronized (this) { if (!connected) { @@ -461,6 +462,15 @@ try { String parts[] = params.split(" "); String hostPort[] = parts[1].split(":"); + int bytesSize = 1024; + if (parts.length == 3) { + try { + bytesSize = Integer.parseInt(parts[2]); + } catch(Exception e) { + System.err.println("Not an integer: " + parts[2]); + } + } + bytes = new byte[bytesSize]; s = new Socket(hostPort[0], Integer.parseInt(hostPort[1])); zkThread = new ZooKeeperThread(parts[0]); sendThread = new SenderThread(s); @@ -545,12 +555,16 @@ } private static boolean leaderOnly; + private static boolean leaderServes; private static String []processOptions(String args[]) { ArrayList newArgs = new ArrayList(); for(String a: args) { if (a.equals("--leaderOnly")) { leaderOnly = true; + leaderServes = true; + } else if (a.equals("--leaderServes")) { + leaderServes = true; } else { newArgs.add(a); } @@ -571,7 +585,7 @@ NoAssignmentException { args = processOptions(args); - if (args.length == 4) { + if (args.length == 5) { try { StatusWatcher statusWatcher = new StatusWatcher(); ZooKeeper zk = new ZooKeeper(args[0], 15000, statusWatcher); @@ -587,7 +601,7 @@ StringBuilder quorumHostPort = new StringBuilder(); StringBuilder zkHostPort = new StringBuilder(); for (int i = 0; i < serverCount; i++) { - String r[] = QuorumPeerInstance.createServer(im, i); + String r[] = QuorumPeerInstance.createServer(im, i, leaderServes); if (i > 0) { quorumHostPort.append(','); zkHostPort.append(','); @@ -694,7 +708,7 @@ private static void doUsage() { System.err.println("USAGE: " + GenerateLoad.class.getName() - + " [--leaderOnly] zookeeper_host:port containerPrefix #ofServers #ofClients"); + + " [--leaderOnly] [--leaderServes] zookeeper_host:port containerPrefix #ofServers #ofClients requestSize"); System.exit(2); } } Modified: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java (original) +++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java Fri Feb 20 00:23:40 2009 @@ -60,18 +60,21 @@ File zkDirs = new File(tmpDir, "zktmp.cfg"); logDir = tmpDir; snapDir = tmpDir; + Properties p; if (zkDirs.exists()) { - Properties p = new Properties(); + p = new Properties(); p.load(new FileInputStream(zkDirs)); - logDir = new File(p.getProperty("logDir", tmpDir.getAbsolutePath())); - snapDir = new File(p.getProperty("snapDir", tmpDir.getAbsolutePath())); + } else { + p = System.getProperties(); } + logDir = new File(p.getProperty("logDir", tmpDir.getAbsolutePath())); + snapDir = new File(p.getProperty("snapDir", tmpDir.getAbsolutePath())); logDir = File.createTempFile("zktst", ".dir", logDir); logDir.delete(); - logDir.mkdir(); + logDir.mkdirs(); snapDir = File.createTempFile("zktst", ".dir", snapDir); snapDir.delete(); - snapDir.mkdir(); + snapDir.mkdirs(); } catch (IOException e) { e.printStackTrace(); } @@ -79,12 +82,18 @@ public void configure(String params) { if (clientAddr == null) { + String parts[] = params.split(" "); // The first time we are configured, it is just to tell // us which machine we are - serverId = Integer.parseInt(params); + serverId = Integer.parseInt(parts[0]); if (LOG.isDebugEnabled()) { LOG.info("Setting up server " + serverId); } + if (parts.length > 1 && parts[1].equals("false")) { + System.setProperty("zookeeper.leaderServes", "no"); + } else { + System.setProperty("zookeeper.leaderServes", "yes"); + } // Let's grab two ports try { ServerSocket ss = new ServerSocket(0, 1, InetAddress.getLocalHost()); @@ -155,6 +164,7 @@ LOG.warn("Peer " + serverId + " already started"); return; } + System.err.println("SnapDir = " + snapDir + " LogDir = " + logDir); peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 0, serverId, tickTime, initLimit, syncLimit); peer.start(); for(int i = 0; i < 5; i++) { @@ -212,7 +222,22 @@ * @throws KeeperException */ public static String[] createServer(InstanceManager im, int i) throws NoAvailableContainers, DuplicateNameException, InterruptedException, KeeperException { - im.assignInstance("server"+i, QuorumPeerInstance.class, Integer.toString(i), 50); + return createServer(im, i, true); + } + + /** + * This method is used to configure a QuorumPeerInstance + * + * @param im the InstanceManager that will be managing the new instance + * @param i the server number to configure (should be zero based) + * @param leaderServes if false, the leader will not accept client connections + * @throws NoAvailableContainers + * @throws DuplicateNameException + * @throws InterruptedException + * @throws KeeperException + */ + public static String[] createServer(InstanceManager im, int i, boolean leaderServes) throws NoAvailableContainers, DuplicateNameException, InterruptedException, KeeperException { + im.assignInstance("server"+i, QuorumPeerInstance.class, Integer.toString(i) + " " + leaderServes, 50); return im.getStatus("server"+i, 3000).split(","); } Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java Fri Feb 20 00:23:40 2009 @@ -43,7 +43,7 @@ ArrayList threads; File tmpdir[]; int port[]; - int[] round; + volatile int [] round; @Override public void setUp() throws Exception { Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java?rev=746067&r1=746066&r2=746067&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java Fri Feb 20 00:23:40 2009 @@ -44,7 +44,7 @@ private static String HOSTPORT = "127.0.0.1:2344"; - private CountDownLatch startSignal; + private volatile CountDownLatch startSignal; @Override protected void setUp() throws Exception { @@ -86,7 +86,11 @@ ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); + startSignal = new CountDownLatch(1); ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this); + startSignal.await(CONNECTION_TIMEOUT, + TimeUnit.MILLISECONDS); + assertTrue("count == 0", startSignal.getCount() == 0); String path; LOG.info("starting creating nodes"); for (int i = 0; i < 10; i++) {