Return-Path: Delivered-To: apmail-hadoop-zookeeper-commits-archive@minotaur.apache.org Received: (qmail 84479 invoked from network); 18 Dec 2009 02:21:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 18 Dec 2009 02:21:11 -0000 Received: (qmail 69888 invoked by uid 500); 18 Dec 2009 02:21:11 -0000 Delivered-To: apmail-hadoop-zookeeper-commits-archive@hadoop.apache.org Received: (qmail 69861 invoked by uid 500); 18 Dec 2009 02:21:11 -0000 Mailing-List: contact zookeeper-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: zookeeper-dev@ Delivered-To: mailing list zookeeper-commits@hadoop.apache.org Received: (qmail 69851 invoked by uid 99); 18 Dec 2009 02:21:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Dec 2009 02:21:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Dec 2009 02:21:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1F37E2388996; Fri, 18 Dec 2009 02:20:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r892111 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/persistence/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/ Date: Fri, 18 Dec 2009 02:20:11 -0000 To: zookeeper-commits@hadoop.apache.org From: mahadev@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091218022038.1F37E2388996@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mahadev Date: Fri Dec 18 02:19:50 2009 New Revision: 892111 URL: http://svn.apache.org/viewvc?rev=892111&view=rev Log: ZOOKEEPER-596. The last logged zxid calculated by zookeeper servers could cause problems in leader election if data gets corrupted. (mahadev) Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Fri Dec 18 02:19:50 2009 @@ -187,6 +187,9 @@ ZOOKEEPER-600. TODO pondering about allocation behavior in zkpython may be removed (gustavo via mahadev) + ZOOKEEPER-596. The last logged zxid calculated by zookeeper servers could + cause problems in leader election if data gets corrupted. (mahadev) + IMPROVEMENTS: ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to "socket reuse" and failure to close client (phunt via mahadev) Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java Fri Dec 18 02:19:50 2009 @@ -73,8 +73,8 @@ } public String[] getEphemeralNodes() { - if(zk.dataTree!=null){ - String[] res=zk.dataTree.getEphemerals(connection.getSessionId()) + if(zk.getZKDatabase() !=null){ + String[] res= zk.getZKDatabase().getEphemerals(connection.getSessionId()) .toArray(new String[0]); Arrays.sort(res); return res; Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Fri Dec 18 02:19:50 2009 @@ -99,7 +99,7 @@ } } if (request.hdr != null) { - rc = zks.dataTree.processTxn(request.hdr, request.txn); + rc = zks.getZKDatabase().processTxn(request.hdr, request.txn); if (request.type == OpCode.createSession) { if (request.txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) request.txn; @@ -116,7 +116,7 @@ } // do not add non quorum packets to the queue. if (Request.isQuorum(request.type)) { - zks.addCommittedProposal(request); + zks.getZKDatabase().addCommittedProposal(request); } } @@ -168,7 +168,7 @@ request.createTime, System.currentTimeMillis()); cnxn.sendResponse(new ReplyHeader(-2, - zks.dataTree.lastProcessedZxid, 0), null, "response"); + zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response"); return; } case OpCode.createSession: { @@ -229,7 +229,7 @@ if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); } - Stat stat = zks.dataTree.statNode(path, existsRequest + Stat stat = zks.getZKDatabase().statNode(path, existsRequest .getWatch() ? cnxn : null); rsp = new ExistsResponse(stat); break; @@ -239,7 +239,7 @@ GetDataRequest getDataRequest = new GetDataRequest(); ZooKeeperServer.byteBuffer2Record(request.request, getDataRequest); - DataNode n = zks.dataTree.getNode(getDataRequest.getPath()); + DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); if (n == null) { throw new KeeperException.NoNodeException(); } @@ -247,11 +247,11 @@ synchronized(n) { aclL = n.acl; } - PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(aclL), + PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL), ZooDefs.Perms.READ, request.authInfo); Stat stat = new Stat(); - byte b[] = zks.dataTree.getData(getDataRequest.getPath(), stat, + byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; @@ -263,7 +263,7 @@ request.request.rewind(); ZooKeeperServer.byteBuffer2Record(request.request, setWatches); long relativeZxid = setWatches.getRelativeZxid(); - zks.dataTree.setWatches(relativeZxid, + zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), cnxn); @@ -276,7 +276,7 @@ getACLRequest); Stat stat = new Stat(); List acl = - zks.dataTree.getACL(getACLRequest.getPath(), stat); + zks.getZKDatabase().getACL(getACLRequest.getPath(), stat); rsp = new GetACLResponse(acl, stat); break; } @@ -285,18 +285,19 @@ GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); ZooKeeperServer.byteBuffer2Record(request.request, getChildrenRequest); - DataNode n = zks.dataTree.getNode(getChildrenRequest.getPath()); + DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath()); if (n == null) { throw new KeeperException.NoNodeException(); } Long aclG; synchronized(n) { aclG = n.acl; + } - PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(aclG), + PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG), ZooDefs.Perms.READ, request.authInfo); - List children = zks.dataTree.getChildren( + List children = zks.getZKDatabase().getChildren( getChildrenRequest.getPath(), null, getChildrenRequest .getWatch() ? cnxn : null); rsp = new GetChildrenResponse(children); @@ -308,7 +309,7 @@ ZooKeeperServer.byteBuffer2Record(request.request, getChildren2Request); Stat stat = new Stat(); - DataNode n = zks.dataTree.getNode(getChildren2Request.getPath()); + DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath()); if (n == null) { throw new KeeperException.NoNodeException(); } @@ -316,10 +317,10 @@ synchronized(n) { aclG = n.acl; } - PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(aclG), + PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG), ZooDefs.Perms.READ, request.authInfo); - List children = zks.dataTree.getChildren( + List children = zks.getZKDatabase().getChildren( getChildren2Request.getPath(), stat, getChildren2Request .getWatch() ? cnxn : null); rsp = new GetChildren2Response(children, stat); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Fri Dec 18 02:19:50 2009 @@ -687,13 +687,13 @@ if (zk == null) { throw new IOException("ZooKeeperServer not running"); } - if (connReq.getLastZxidSeen() > zk.dataTree.lastProcessedZxid) { + if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) { String msg = "Refusing session request for client " + sock.socket().getRemoteSocketAddress() + " as it has seen zxid 0x" + Long.toHexString(connReq.getLastZxidSeen()) + " our last zxid is 0x" - + Long.toHexString(zk.dataTree.lastProcessedZxid) + + Long.toHexString(zk.getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; LOG.info(msg); @@ -800,7 +800,7 @@ sb.append("SessionTracker dump: \n"); sb.append(zk.sessionTracker.toString()).append("\n"); sb.append("ephemeral nodes dump:\n"); - sb.append(zk.dataTree.dumpEphemerals()).append("\n"); + sb.append(zk.getZKDatabase().dumpEphemerals()).append("\n"); sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); } k.interestOps(SelectionKey.OP_WRITE); @@ -825,7 +825,7 @@ sb.append("\n"); } sb.append(zk.serverStats().toString()); - sb.append("Node count: ").append(zk.dataTree.getNodeCount()). + sb.append("Node count: ").append(zk.getZKDatabase().getNodeCount()). append("\n"); } else { sb.append("ZooKeeperServer not running\n"); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Fri Dec 18 02:19:50 2009 @@ -132,7 +132,7 @@ } */ if (lastChange == null) { - DataNode n = zks.dataTree.getNode(path); + DataNode n = zks.getZKDatabase().getNode(path); if (n != null) { Long acl; Set children; @@ -142,7 +142,7 @@ } lastChange = new ChangeRecord(-1, path, n.stat, children != null ? children.size() : 0, - zks.dataTree.convertLong(acl)); + zks.getZKDatabase().convertLong(acl)); } } } @@ -278,7 +278,7 @@ path = deleteRequest.getPath(); lastSlash = path.lastIndexOf('/'); if (lastSlash == -1 || path.indexOf('\0') != -1 - || zks.dataTree.isSpecialPath(path)) { + || zks.getZKDatabase().isSpecialPath(path)) { throw new KeeperException.BadArgumentsException(path); } parentPath = path.substring(0, lastSlash); @@ -366,7 +366,7 @@ // queues up this operation without being the session owner. // this request is the last of the session so it should be ok //zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); - HashSet es = zks.dataTree + HashSet es = zks.getZKDatabase() .getEphemerals(request.sessionId); synchronized (zks.outstandingChanges) { for (ChangeRecord c : zks.outstandingChanges) { Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Fri Dec 18 02:19:50 2009 @@ -104,12 +104,12 @@ } if (si != null) { // track the number of records written to the log - if (zks.getLogWriter().append(si)) { + if (zks.getZKDatabase().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log - zks.getLogWriter().rollLog(); + zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); @@ -155,7 +155,7 @@ if (toFlush.isEmpty()) return; - zks.getLogWriter().commit(); + zks.getZKDatabase().commit(); while (!toFlush.isEmpty()) { Request i = toFlush.remove(); nextProcessor.processRequest(i); Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=892111&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (added) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Fri Dec 18 02:19:50 2009 @@ -0,0 +1,454 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.jute.Record; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.DataTree.ProcessTxnResult; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.server.quorum.QuorumPacket; +import org.apache.zookeeper.server.quorum.Leader.Proposal; +import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.txn.TxnHeader; + +/** + * This class maintains the in memory database of zookeeper + * server states that includes the sessions, datatree and the + * committed logs. It is booted up after reading the logs + * and snapshots from the disk. + */ +public class ZKDatabase { + + private static final Logger LOG = Logger.getLogger(ZKDatabase.class); + + /** + * make sure on a clear you take care of + * all these members. + */ + protected DataTree dataTree; + protected ConcurrentHashMap sessionsWithTimeouts; + protected FileTxnSnapLog snapLog; + protected long minCommittedLog, maxCommittedLog; + public static final int commitLogCount = 500; + protected static int commitLogBuffer = 700; + protected LinkedList committedLog = new LinkedList(); + volatile private boolean initialized = false; + + /** + * the filetxnsnaplog that this zk database + * maps to. There is a one to one relationship + * between a filetxnsnaplog and zkdatabase. + * @param snapLog the FileTxnSnapLog mapping this zkdatabase + */ + public ZKDatabase(FileTxnSnapLog snapLog) { + dataTree = new DataTree(); + sessionsWithTimeouts = new ConcurrentHashMap(); + this.snapLog = snapLog; + } + + /** + * checks to see if the zk database has been + * initialized or not. + * @return true if zk database is initialized and false if not + */ + public boolean isInitialized() { + return initialized; + } + + /** + * clear the zkdatabase. + * Note to developers - be careful to see that + * the clear method does clear out all the + * data structures in zkdatabase. + */ + public void clear() { + minCommittedLog = 0; + maxCommittedLog = 0; + /* to be safe we just create a new + * datatree. + */ + dataTree = new DataTree(); + sessionsWithTimeouts.clear(); + committedLog.clear(); + initialized = false; + } + + /** + * the datatree for this zkdatabase + * @return the datatree for this zkdatabase + */ + public DataTree getDataTree() { + return this.dataTree; + } + + /** + * the committed log for this zk database + * @return the committed log for this zkdatabase + */ + public long getmaxCommittedLog() { + return maxCommittedLog; + } + + + /** + * the minimum committed transaction log + * available in memory + * @return the minimum committed transaction + * log available in memory + */ + public long getminCommittedLog() { + return minCommittedLog; + } + + public LinkedList getCommittedLog() { + return this.committedLog; + } + + /** + * get the last processed zxid from a datatree + * @return the last processed zxid of a datatree + */ + public long getDataTreeLastProcessedZxid() { + return dataTree.lastProcessedZxid; + } + + /** + * set the datatree initialized or not + * @param b set the datatree initialized to b + */ + public void setDataTreeInit(boolean b) { + dataTree.initialized = b; + } + + /** + * return the sessions in the datatree + * @return the data tree sessions + */ + public Collection getSessions() { + return dataTree.getSessions(); + } + + /** + * get sessions with timeouts + * @return the hashmap of sessions with timeouts + */ + public ConcurrentHashMap getSessionWithTimeOuts() { + return sessionsWithTimeouts; + } + + + /** + * load the database from the disk onto memory and also add + * the transactions to the committedlog in memory. + * @return the last valid zxid on disk + * @throws IOException + */ + public long loadDataBase() throws IOException { + PlayBackListener listener=new PlayBackListener(){ + public void onTxnLoaded(TxnHeader hdr,Record txn){ + Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(), + null, null); + r.txn = txn; + r.hdr = hdr; + r.zxid = hdr.getZxid(); + addCommittedProposal(r); + } + }; + + long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener); + initialized = true; + return zxid; + } + + /** + * maintains a list of last committedLog + * or so committed requests. This is used for + * fast follower synchronization. + * @param request committed request + */ + public void addCommittedProposal(Request request) { + synchronized (committedLog) { + if (committedLog.size() > commitLogCount) { + committedLog.removeFirst(); + minCommittedLog = committedLog.getFirst().packet.getZxid(); + } + if (committedLog.size() == 0) { + minCommittedLog = request.zxid; + maxCommittedLog = request.zxid; + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + try { + request.hdr.serialize(boa, "hdr"); + if (request.txn != null) { + request.txn.serialize(boa, "txn"); + } + baos.close(); + } catch (IOException e) { + LOG.error("This really should be impossible", e); + } + QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, + baos.toByteArray(), null); + Proposal p = new Proposal(); + p.packet = pp; + p.request = request; + committedLog.add(p); + maxCommittedLog = p.packet.getZxid(); + } + } + + + /** + * remove a cnxn from the datatree + * @param cnxn the cnxn to remove from the datatree + */ + public void removeCnxn(ServerCnxn cnxn) { + dataTree.removeCnxn(cnxn); + } + + /** + * kill a given session in the datatree + * @param sessionId the session id to be killed + * @param zxid the zxid of kill session transaction + */ + public void killSession(long sessionId, long zxid) { + dataTree.killSession(sessionId, zxid); + } + + /** + * get a string dump of all the ephemerals in + * the datatree + * @return the string dump of ephemerals + */ + public String dumpEphemerals() { + return dataTree.dumpEphemerals(); + } + + /** + * the node count of the datatree + * @return the node count of datatree + */ + public int getNodeCount() { + return dataTree.getNodeCount(); + } + + /** + * the paths for ephemeral session id + * @param sessionId the session id for which paths match to + * @return the paths for a session id + */ + public HashSet getEphemerals(long sessionId) { + return dataTree.getEphemerals(sessionId); + } + + /** + * the last processed zxid in the datatree + * @param zxid the last processed zxid in the datatree + */ + public void setlastProcessedZxid(long zxid) { + dataTree.lastProcessedZxid = zxid; + } + + /** + * the process txn on the data + * @param hdr the txnheader for the txn + * @param txn the transaction that needs to be processed + * @return the result of processing the transaction on this + * datatree/zkdatabase + */ + public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { + return dataTree.processTxn(hdr, txn); + } + + /** + * stat the path + * @param path the path for which stat is to be done + * @param serverCnxn the servercnxn attached to this request + * @return the stat of this node + * @throws KeeperException.NoNodeException + */ + public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException { + return dataTree.statNode(path, serverCnxn); + } + + /** + * get the datanode for this path + * @param path the path to lookup + * @return the datanode for getting the path + */ + public DataNode getNode(String path) { + return dataTree.getNode(path); + } + + /** + * convert from long to the acl entry + * @param aclL the long for which to get the acl + * @return the acl corresponding to this long entry + */ + public List convertLong(Long aclL) { + return dataTree.convertLong(aclL); + } + + /** + * get data and stat for a path + * @param path the path being queried + * @param stat the stat for this path + * @param watcher the watcher function + * @return + * @throws KeeperException.NoNodeException + */ + public byte[] getData(String path, Stat stat, Watcher watcher) + throws KeeperException.NoNodeException { + return dataTree.getData(path, stat, watcher); + } + + /** + * set watches on the datatree + * @param relativeZxid the relative zxid that client has seen + * @param dataWatches the data watches the client wants to reset + * @param existWatches the exists watches the client wants to reset + * @param childWatches the child watches the client wants to reset + * @param watcher the watcher function + */ + public void setWatches(long relativeZxid, List dataWatches, + List existWatches, List childWatches, Watcher watcher) { + dataTree.setWatches(relativeZxid, dataWatches, existWatches, childWatches, watcher); + } + + /** + * get acl for a path + * @param path the path to query for acl + * @param stat the stat for the node + * @return the acl list for this path + * @throws NoNodeException + */ + public List getACL(String path, Stat stat) throws NoNodeException { + return dataTree.getACL(path, stat); + } + + /** + * get children list for this path + * @param path the path of the node + * @param stat the stat of the node + * @param watcher the watcher function for this path + * @return the list of children for this path + * @throws KeeperException.NoNodeException + */ + public List getChildren(String path, Stat stat, Watcher watcher) + throws KeeperException.NoNodeException { + return dataTree.getChildren(path, stat, watcher); + } + + /** + * check if the path is special or not + * @param path the input path + * @return true if path is special and false if not + */ + public boolean isSpecialPath(String path) { + return dataTree.isSpecialPath(path); + } + + /** + * get the acl size of the datatree + * @return the acl size of the datatree + */ + public int getAclSize() { + return dataTree.longKeyMap.size(); + } + + /** + * truncate the zkdatabase to this zxid + * @param zxid the zxid to truncate zk database to + * @return true if the truncate is succesful and false if not + * @throws IOException + */ + public boolean truncateLog(long zxid) throws IOException { + clear(); + boolean truncated = this.snapLog.truncateLog(zxid); + loadDataBase(); + return truncated; + } + + /** + * deserialize a snapshot from an input archive + * @param ia the input archive you want to deserialize from + * @throws IOException + */ + public void deserializeSnapshot(InputArchive ia) throws IOException { + clear(); + SerializeUtils.deserializeSnapshot(getDataTree(),ia,getSessionWithTimeOuts()); + initialized = true; + } + + /** + * serialize the snapshot + * @param oa the output archive to which the snapshot needs to be serialized + * @throws IOException + * @throws InterruptedException + */ + public void serializeSnapshot(OutputArchive oa) throws IOException, + InterruptedException { + SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts()); + } + + /** + * append to the underlying transaction log + * @param si the request to append + * @return true if the append was succesfull and false if not + */ + public boolean append(Request si) throws IOException { + return this.snapLog.append(si); + } + + /** + * roll the underlying log + */ + public void rollLog() throws IOException { + this.snapLog.rollLog(); + } + + /** + * commit to the underlying transaction log + * @throws IOException + */ + public void commit() throws IOException { + this.snapLog.commit(); + } + + +} \ No newline at end of file Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Fri Dec 18 02:19:50 2009 @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -96,16 +97,10 @@ public static final int DEFAULT_TICK_TIME = 3000; protected int tickTime = DEFAULT_TICK_TIME; - - public static final int commitLogCount = 500; - public int commitLogBuffer = 700; - public final LinkedList committedLog = new LinkedList(); - public long minCommittedLog, maxCommittedLog; - private DataTreeBuilder treeBuilder; - public DataTree dataTree; protected SessionTracker sessionTracker; private FileTxnSnapLog txnLogFactory = null; - protected ConcurrentHashMap sessionsWithTimeouts; + private ConcurrentHashMap sessionsWithTimeouts; + private ZKDatabase zkDb; protected long hzxid = 0; public final static Exception ok = new Exception("No prob"); protected RequestProcessor firstProcessor; @@ -128,7 +123,7 @@ private final ServerStats serverStats; void removeCnxn(ServerCnxn cnxn) { - dataTree.removeCnxn(cnxn); + zkDb.removeCnxn(cnxn); } /** @@ -140,27 +135,38 @@ */ public ZooKeeperServer() { serverStats = new ServerStats(this); - treeBuilder = new BasicDataTreeBuilder(); } /** * Creates a ZooKeeperServer instance. It sets everything up, but doesn't * actually start listening for clients until run() is invoked. - * + * * @param dataDir the directory to put the data * @throws IOException */ - public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, - DataTreeBuilder treeBuilder) throws IOException { + public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, + DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException { serverStats = new ServerStats(this); - this.treeBuilder = treeBuilder; - this.txnLogFactory = txnLogFactory; + this.zkDb = zkDb; this.tickTime = tickTime; - LOG.info("Created server"); + LOG.info("Created server with tickTime " + tickTime + " datadir " + + txnLogFactory.getDataDir() + " snapdir " + txnLogFactory.getSnapDir()); } + /** + * creates a zookeeperserver instance. + * @param txnLogFactory the file transaction snapshot logging class + * @param tickTime the ticktime for the server + * @param treeBuilder the datatree builder + * @throws IOException + */ + public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, + DataTreeBuilder treeBuilder) throws IOException { + this(txnLogFactory, tickTime, treeBuilder, new ZKDatabase(txnLogFactory)); + } + public ServerStats serverStats() { return serverStats; } @@ -172,7 +178,7 @@ */ public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException { - this(new FileTxnSnapLog(snapDir,logDir), + this( new FileTxnSnapLog(snapDir, logDir), tickTime,new BasicDataTreeBuilder()); } @@ -182,84 +188,51 @@ * @throws IOException */ public ZooKeeperServer(FileTxnSnapLog txnLogFactory,DataTreeBuilder treeBuilder) throws IOException { - this(txnLogFactory, DEFAULT_TICK_TIME, treeBuilder); + this(txnLogFactory, DEFAULT_TICK_TIME, treeBuilder, new ZKDatabase(txnLogFactory)); } /** + * get the zookeeper database for this server + * @return the zookeeper database for this server + */ + public ZKDatabase getZKDatabase() { + return this.zkDb; + } + + /** + * set the zkdatabase for this zookeeper server + * @param zkDb + */ + public void setZKDatabase(ZKDatabase zkDb) { + this.zkDb = zkDb; + } + + /** * Restore sessions and data */ public void loadData() throws IOException, InterruptedException { - PlayBackListener listener=new PlayBackListener(){ - public void onTxnLoaded(TxnHeader hdr,Record txn){ - Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(), - null, null); - r.txn = txn; - r.hdr = hdr; - r.zxid = hdr.getZxid(); - addCommittedProposal(r); - } - }; - sessionsWithTimeouts = new ConcurrentHashMap(); - dataTree = treeBuilder.build(); - setZxid(txnLogFactory.restore(dataTree,sessionsWithTimeouts,listener)); + zkDb.loadDataBase(); + setZxid(zkDb.loadDataBase()); // Clean up dead sessions LinkedList deadSessions = new LinkedList(); - for (long session : dataTree.getSessions()) { + for (long session : zkDb.getSessions()) { + sessionsWithTimeouts = zkDb.getSessionWithTimeOuts(); if (sessionsWithTimeouts.get(session) == null) { deadSessions.add(session); } } - dataTree.initialized = true; + zkDb.setDataTreeInit(true); for (long session : deadSessions) { // XXX: Is lastProcessedZxid really the best thing to use? - killSession(session, dataTree.lastProcessedZxid); + killSession(session, zkDb.getDataTreeLastProcessedZxid()); } // Make a clean snapshot takeSnapshot(); } - /** - * maintains a list of last 500 or so committed requests. This is used for - * fast follower synchronization. - * - * @param request committed request - */ - - public void addCommittedProposal(Request request) { - synchronized (committedLog) { - if (committedLog.size() > commitLogCount) { - committedLog.removeFirst(); - minCommittedLog = committedLog.getFirst().packet.getZxid(); - } - if (committedLog.size() == 0) { - minCommittedLog = request.zxid; - maxCommittedLog = request.zxid; - } - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - try { - request.hdr.serialize(boa, "hdr"); - if (request.txn != null) { - request.txn.serialize(boa, "txn"); - } - baos.close(); - } catch (IOException e) { - LOG.error("This really should be impossible", e); - } - QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, - baos.toByteArray(), null); - Proposal p = new Proposal(); - p.packet = pp; - p.request = request; - committedLog.add(p); - maxCommittedLog = p.packet.getZxid(); - } - } - public void takeSnapshot(){ try { - txnLogFactory.save(dataTree, sessionsWithTimeouts); + txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts()); } catch (IOException e) { LOG.fatal("Severe unrecoverable error, exiting", e); // This is a severe error that we cannot recover from, @@ -268,18 +241,7 @@ } } - public void serializeSnapshot(OutputArchive oa) throws IOException, - InterruptedException { - SerializeUtils.serializeSnapshot(dataTree, oa, sessionsWithTimeouts); - } - - public void deserializeSnapshot(InputArchive ia) throws IOException { - sessionsWithTimeouts = new ConcurrentHashMap(); - dataTree = treeBuilder.build(); - - SerializeUtils.deserializeSnapshot(dataTree,ia,sessionsWithTimeouts); - } - + /** * This should be called from a synchronized block on this! */ @@ -312,7 +274,7 @@ } protected void killSession(long sessionId, long zxid) { - dataTree.killSession(sessionId, zxid); + zkDb.killSession(sessionId, zxid); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "ZooKeeperServer --- killSession: 0x" @@ -358,7 +320,7 @@ MBeanRegistry.getInstance().register(jmxServerBean, null); try { - jmxDataTreeBean = new DataTreeBean(dataTree); + jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); @@ -371,7 +333,11 @@ } public void startup() throws IOException, InterruptedException { - if (dataTree == null) { + //check to see if zkDb is not null + if (zkDb == null) { + zkDb = new ZKDatabase(this.txnLogFactory); + } + if (!zkDb.isInitialized()) { loadData(); } createSessionTracker(); @@ -395,7 +361,7 @@ } protected void createSessionTracker() { - sessionTracker = new SessionTrackerImpl(this, sessionsWithTimeouts, + sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, 1); ((SessionTrackerImpl)sessionTracker).start(); } @@ -416,8 +382,8 @@ if (firstProcessor != null) { firstProcessor.shutdown(); } - if (dataTree != null) { - dataTree.clear(); + if (zkDb != null) { + zkDb.clear(); } unregisterJMX(); @@ -638,7 +604,7 @@ * datatree */ public long getLastProcessedZxid() { - return dataTree.lastProcessedZxid; + return zkDb.getDataTreeLastProcessedZxid(); } /** @@ -658,17 +624,9 @@ * @throws IOException */ public void truncateLog(long zxid) throws IOException { - this.txnLogFactory.truncateLog(zxid); + this.zkDb.truncateLog(zxid); } - - /** - * the snapshot and logwriter for this instance - * @return - */ - public FileTxnSnapLog getLogWriter() { - return this.txnLogFactory; - } - + public int getTickTime() { return tickTime; } @@ -677,14 +635,6 @@ this.tickTime = tickTime; } - public DataTreeBuilder getTreeBuilder() { - return treeBuilder; - } - - public void setTreeBuilder(DataTreeBuilder treeBuilder) { - this.treeBuilder = treeBuilder; - } - public int getClientPort() { return serverCnxnFactory != null ? serverCnxnFactory.ss.socket().getLocalPort() : -1; } @@ -700,4 +650,6 @@ public String getState() { return "standalone"; } + + } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java Fri Dec 18 02:19:50 2009 @@ -61,7 +61,7 @@ /** * deserialize a data tree from the most recent snapshot * @return the zxid of the snapshot - */ + */ public long deserialize(DataTree dt, Map sessions) throws IOException { // we run through 100 snapshots (not all of them) Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java Fri Dec 18 02:19:50 2009 @@ -28,6 +28,7 @@ import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.txn.TxnHeader; @@ -56,8 +57,8 @@ * @throws IOException */ FollowerZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self, - DataTreeBuilder treeBuilder) throws IOException { - super(logFactory, self.tickTime,treeBuilder); + DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException { + super(logFactory, self.tickTime,treeBuilder, zkDb); this.self = self; this.pendingSyncs = new ConcurrentLinkedQueue(); } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Fri Dec 18 02:19:50 2009 @@ -279,7 +279,7 @@ long epoch = self.getLastLoggedZxid() >> 32L; epoch++; zk.setZxid(epoch << 32L); - zk.dataTree.lastProcessedZxid = zk.getZxid(); + zk.getZKDatabase().setlastProcessedZxid(zk.getZxid()); synchronized(this){ lastProposed = zk.getZxid(); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Fri Dec 18 02:19:50 2009 @@ -28,6 +28,7 @@ import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.SessionTrackerImpl; +import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; @@ -40,7 +41,7 @@ */ public class LeaderZooKeeperServer extends ZooKeeperServer { private QuorumPeer self; - + CommitProcessor commitProcessor; /** @@ -49,8 +50,8 @@ * @throws IOException */ LeaderZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self, - DataTreeBuilder treeBuilder) throws IOException { - super(logFactory, self.tickTime,treeBuilder); + DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException { + super(logFactory, self.tickTime,treeBuilder, zkDb); this.self = self; } @@ -80,7 +81,7 @@ @Override protected void createSessionTracker() { - sessionTracker = new SessionTrackerImpl(this, sessionsWithTimeouts, + sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(), tickTime, self.getId()); ((SessionTrackerImpl)sessionTracker).start(); } @@ -94,7 +95,7 @@ protected void registerJMX() { // register with JMX try { - jmxDataTreeBean = new DataTreeBean(dataTree); + jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree()); MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Fri Dec 18 02:19:50 2009 @@ -280,12 +280,13 @@ synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid())); - zk.loadData(); } else if (qp.getType() == Leader.SNAP) { LOG.info("Getting a snapshot from leader"); // The leader is going to dump the database - zk.deserializeSnapshot(leaderIs); + // clear our own database and read + zk.getZKDatabase().clear(); + zk.getZKDatabase().deserializeSnapshot(leaderIs); String signature = leaderIs.readString("signature"); if (!signature.equals("BenWasHere")) { LOG.error("Missing signature. Got " + signature); @@ -295,7 +296,7 @@ //we need to truncate the log to the lastzxid of the leader LOG.warn("Truncating log to get in sync with the leader 0x" + Long.toHexString(qp.getZxid())); - boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid()); + boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { // not able to truncate the log LOG.fatal("Not able to truncate the log " @@ -303,7 +304,6 @@ System.exit(13); } - zk.loadData(); } else { LOG.fatal("Got unexpected packet from leader " @@ -311,7 +311,7 @@ System.exit(13); } - zk.dataTree.lastProcessedZxid = newLeaderZxid; + zk.getZKDatabase().setlastProcessedZxid(newLeaderZxid); } ack.setZxid(newLeaderZxid & ~0xffffffffL); writePacket(ack, true); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Fri Dec 18 02:19:50 2009 @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.LinkedList; import java.util.concurrent.LinkedBlockingQueue; import org.apache.jute.BinaryInputArchive; @@ -254,13 +255,14 @@ /* we are sending the diff check if we have proposals in memory to be able to * send a diff to the */ - synchronized(leader.zk.committedLog) { - if (leader.zk.committedLog.size() != 0) { - if ((leader.zk.maxCommittedLog >= peerLastZxid) - && (leader.zk.minCommittedLog <= peerLastZxid)) { + LinkedList proposals = leader.zk.getZKDatabase().getCommittedLog(); + synchronized(proposals) { + if (proposals.size() != 0) { + if ((leader.zk.getZKDatabase().getmaxCommittedLog() >= peerLastZxid) + && (leader.zk.getZKDatabase().getminCommittedLog() <= peerLastZxid)) { packetToSend = Leader.DIFF; - zxidToSend = leader.zk.maxCommittedLog; - for (Proposal propose: leader.zk.committedLog) { + zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog(); + for (Proposal propose: proposals) { if (propose.packet.getZxid() > peerLastZxid) { queuePacket(propose.packet); QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), @@ -274,7 +276,7 @@ else { logTxns = false; } - } + } //check if we decided to send a diff or we need to send a truncate // we avoid using epochs for truncating because epochs make things @@ -282,11 +284,11 @@ // only if we know that there is a committed zxid in the queue that // is less than the one the peer has we send a trunc else to make // things simple we just send sanpshot. - if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) { + if (logTxns && (peerLastZxid > leader.zk.getZKDatabase().getmaxCommittedLog())) { // this is the only case that we are sure that // we can ask the peer to truncate the log packetToSend = Leader.TRUNC; - zxidToSend = leader.zk.maxCommittedLog; + zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog(); updates = zxidToSend; } @@ -318,7 +320,7 @@ + " zxid of leader is 0x" + Long.toHexString(leaderLastZxid)); // Dump data to peer - leader.zk.serializeSnapshot(oa); + leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); } bufferedOutput.flush(); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java Fri Dec 18 02:19:50 2009 @@ -24,6 +24,7 @@ import org.apache.zookeeper.server.DataTreeBean; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServerBean; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; @@ -36,8 +37,8 @@ protected QuorumPeer self; public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, - DataTreeBuilder treeBuilder) throws IOException { - super(logFactory,tickTime,treeBuilder); + DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException { + super(logFactory,tickTime,treeBuilder, zkDb); } /** @@ -68,17 +69,9 @@ return self.getId(); } - /** - * Learners don't make use of this method, only Leaders. - */ - @Override - public void addCommittedProposal(Request request) { - // Don't do anything! - } - @Override protected void createSessionTracker() { - sessionTracker = new LearnerSessionTracker(this, sessionsWithTimeouts, + sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(), self.getId()); } @@ -92,7 +85,7 @@ protected void registerJMX() { // register with JMX try { - jmxDataTreeBean = new DataTreeBean(dataTree); + jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree()); MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java Fri Dec 18 02:19:50 2009 @@ -25,6 +25,7 @@ import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; /** @@ -48,8 +49,8 @@ new ConcurrentLinkedQueue(); ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, - DataTreeBuilder treeBuilder) throws IOException { - super(logFactory, self.tickTime, treeBuilder); + DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException { + super(logFactory, self.tickTime, treeBuilder, zkDb); this.self = self; } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Fri Dec 18 02:19:50 2009 @@ -33,7 +33,9 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.jmx.ZKMBeanInfo; +import org.apache.zookeeper.server.DataTree; import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.persistence.Util; @@ -73,7 +75,15 @@ QuorumBean jmxQuorumBean; LocalPeerBean jmxLocalPeerBean; LeaderElectionBean jmxLeaderElectionBean; - + + /* ZKDatabase is a top level member of quorumpeer + * which will be used in all the zookeeperservers + * instantiated later. Also, it is created once on + * bootup and only thrown away in case of a truncate + * message from the leader + */ + private ZKDatabase zkDb; + /** * Create an instance of a quorum peer */ @@ -351,6 +361,7 @@ this.initLimit = initLimit; this.syncLimit = syncLimit; this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir); + this.zkDb = new ZKDatabase(this.logFactory); if(quorumConfig == null) this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers)); else this.quorumConfig = quorumConfig; @@ -362,6 +373,12 @@ @Override public synchronized void start() { + try { + zkDb.loadDataBase(); + } catch(IOException ie) { + LOG.fatal("Unable to load database on disk", ie); + throw new RuntimeException("Unable to run quorum server ", ie); + } cnxnFactory.start(); startLeaderElection(); super.start(); @@ -447,27 +464,16 @@ * @return the highest zxid for this host */ public long getLastLoggedZxid() { - /* - * it is possible to have the last zxid with just a snapshot and no log - * related to it. one example is during upgrade wherein the there is no - * corresponding log to the snapshot. in that case just use the snapshot - * zxid - */ - - File lastSnapshot = null; - long maxZxid = -1L; - long maxLogZxid = logFactory.getLastLoggedZxid(); + long lastLogged= -1L; try { - lastSnapshot = logFactory.findMostRecentSnapshot(); - if (lastSnapshot != null) { - maxZxid = Math.max(Util.getZxidFromName(lastSnapshot.getName(), - "snapshot"), maxLogZxid); - } - } catch (IOException ie) { - LOG.warn("Exception finding last snapshot ", ie); - maxZxid = maxLogZxid; + if (!zkDb.isInitialized()) { + zkDb.loadDataBase(); + } + lastLogged = zkDb.getDataTreeLastProcessedZxid(); + } catch(IOException ie) { + LOG.warn("Unable to load database ", ie); } - return maxZxid; + return lastLogged; } public Follower follower; @@ -476,17 +482,17 @@ protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { return new Follower(this, new FollowerZooKeeperServer(logFactory, - this,new ZooKeeperServer.BasicDataTreeBuilder())); + this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb)); } protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException { return new Leader(this, new LeaderZooKeeperServer(logFactory, - this,new ZooKeeperServer.BasicDataTreeBuilder())); + this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb)); } protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException { return new Observer(this, new ObserverZooKeeperServer(logFactory, - this, new ZooKeeperServer.BasicDataTreeBuilder())); + this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb)); } private Election createElectionAlgorithm(int electionAlgorithm){ @@ -878,4 +884,12 @@ public FileTxnSnapLog getTxnFactory() { return this.logFactory; } + + /** + * set zk database for this node + * @param database + */ + public void setZKDatabase(ZKDatabase database) { + this.zkDb = database; + } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Fri Dec 18 02:19:50 2009 @@ -25,6 +25,7 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.jmx.ManagedUtil; import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; @@ -133,6 +134,7 @@ quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); quorumPeer.setCnxnFactory(cnxnFactory); + quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setPeerType(config.getPeerType()); quorumPeer.start(); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java Fri Dec 18 02:19:50 2009 @@ -106,7 +106,7 @@ zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - assertTrue("size of the acl map ", (1 == zks.dataTree.longKeyMap.size())); + assertTrue("size of the acl map ", (1 == zks.getZKDatabase().getAclSize())); for (int j = 100; j < 200; j++) { path = "/" + j; ACL acl = new ACL(); @@ -119,7 +119,7 @@ list.add(acl); zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT); } - assertTrue("size of the acl map ", (101 == zks.dataTree.longKeyMap.size())); + assertTrue("size of the acl map ", (101 == zks.getZKDatabase().getAclSize())); // now shutdown the server and restart it f.shutdown(); assertTrue("waiting for server down", @@ -138,7 +138,7 @@ TimeUnit.MILLISECONDS); assertTrue("count == 0", startSignal.getCount() == 0); - assertTrue("acl map ", (101 == zks.dataTree.longKeyMap.size())); + assertTrue("acl map ", (101 == zks.getZKDatabase().getAclSize())); for (int j = 200; j < 205; j++) { path = "/" + j; ACL acl = new ACL(); @@ -151,7 +151,7 @@ list.add(acl); zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT); } - assertTrue("acl map ", (106 == zks.dataTree.longKeyMap.size())); + assertTrue("acl map ", (106 == zks.getZKDatabase().getAclSize())); zk.close(); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java?rev=892111&r1=892110&r2=892111&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java Fri Dec 18 02:19:50 2009 @@ -187,8 +187,8 @@ LOG.error("Null listener when initializing cnx manager"); } - cnxManager.toSend(new Long(1), createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1)); + cnxManager.toSend(new Long(1), createMsg(ServerState.LOOKING.ordinal(), 0, 0, 1)); cnxManager.recvQueue.take(); - cnxManager.toSend(new Long(1), createMsg(ServerState.FOLLOWING.ordinal(), 1, -1, 1)); + cnxManager.toSend(new Long(1), createMsg(ServerState.FOLLOWING.ordinal(), 1, 0, 1)); } } \ No newline at end of file Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java?rev=892111&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java (added) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java Fri Dec 18 02:19:50 2009 @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Set; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.junit.Before; + + +public class ZkDatabaseCorruptionTest extends QuorumBase { + protected static final Logger LOG = Logger.getLogger(ZkDatabaseCorruptionTest.class); + public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT; + + private final QuorumBase qb = new QuorumBase(); + + @Before + @Override + protected void setUp() throws Exception { + qb.setUp(); + } + + protected void tearDown() throws Exception { + } + + private void corruptFile(File f) throws IOException { + RandomAccessFile outFile = new RandomAccessFile(f, "rw"); + outFile.write("fail servers".getBytes()); + outFile.close(); + } + + private void corruptAllSnapshots(File snapDir) throws IOException { + File[] listFiles = snapDir.listFiles(); + for (File f: listFiles) { + if (f.getName().startsWith("snapshot")) { + corruptFile(f); + } + } + } + + public void testCorruption() throws Exception { + ClientBase.waitForServerUp(qb.hostPort, 10000); + ClientBase.waitForServerUp(qb.hostPort, 10000); + ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() { + public void process(WatchedEvent event) { + }}); + SyncRequestProcessor.setSnapCount(100); + for (int i = 0; i < 2000; i++) { + zk.create("/0-" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + zk.close(); + QuorumPeer leader; + //find out who is the leader and kill it + if ( qb.s5.getPeerState() != ServerState.LEADING) { + throw new Exception("the last server is not the leader"); + } + leader = qb.s5; + // now corrupt the qurompeer database + FileTxnSnapLog snapLog = leader.getTxnFactory(); + File snapDir= snapLog.getSnapDir(); + //corrupt all the snapshot in the snapshot directory + corruptAllSnapshots(snapDir); + qb.shutdownServers(); + qb.setupServers(); + qb.s1.start(); + qb.s2.start(); + qb.s3.start(); + qb.s4.start(); + try { + qb.s5.start(); + assertTrue(false); + } catch(RuntimeException re) { + LOG.info("Got an error: expected", re); + } + //waut for servers to be up + String[] list = qb.hostPort.split(","); + for (int i =0; i < 4; i++) { + String hp = list[i]; + assertTrue("waiting for server up", + ClientBase.waitForServerUp(hp, + CONNECTION_TIMEOUT)); + LOG.info(hp + " is accepting client connections"); + } + + zk = qb.createClient(); + SyncRequestProcessor.setSnapCount(100); + for (int i = 2000; i < 4000; i++) { + zk.create("/0-" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + zk.close(); + qb.s1.shutdown(); + qb.s2.shutdown(); + qb.s3.shutdown(); + qb.s4.shutdown(); + } + + +} \ No newline at end of file