Author: mahadev
Date: Fri Dec 18 02:19:50 2009
New Revision: 892111
URL: http://svn.apache.org/viewvc?rev=892111&view=rev
Log:
ZOOKEEPER-596. The last logged zxid calculated by zookeeper servers could cause problems in leader election if data gets corrupted. (mahadev)
Added:
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Dec 18 02:19:50 2009
@@ -187,6 +187,9 @@
ZOOKEEPER-600. TODO pondering about allocation behavior in zkpython may be
removed (gustavo via mahadev)
+ ZOOKEEPER-596. The last logged zxid calculated by zookeeper servers could
+ cause problems in leader election if data gets corrupted. (mahadev)
+
IMPROVEMENTS:
ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
"socket reuse" and failure to close client (phunt via mahadev)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java Fri Dec 18 02:19:50 2009
@@ -73,8 +73,8 @@
}
public String[] getEphemeralNodes() {
- if(zk.dataTree!=null){
- String[] res=zk.dataTree.getEphemerals(connection.getSessionId())
+ if(zk.getZKDatabase() !=null){
+ String[] res= zk.getZKDatabase().getEphemerals(connection.getSessionId())
.toArray(new String[0]);
Arrays.sort(res);
return res;
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Fri Dec 18 02:19:50 2009
@@ -99,7 +99,7 @@
}
}
if (request.hdr != null) {
- rc = zks.dataTree.processTxn(request.hdr, request.txn);
+ rc = zks.getZKDatabase().processTxn(request.hdr, request.txn);
if (request.type == OpCode.createSession) {
if (request.txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) request.txn;
@@ -116,7 +116,7 @@
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
- zks.addCommittedProposal(request);
+ zks.getZKDatabase().addCommittedProposal(request);
}
}
@@ -168,7 +168,7 @@
request.createTime, System.currentTimeMillis());
cnxn.sendResponse(new ReplyHeader(-2,
- zks.dataTree.lastProcessedZxid, 0), null, "response");
+ zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
return;
}
case OpCode.createSession: {
@@ -229,7 +229,7 @@
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
- Stat stat = zks.dataTree.statNode(path, existsRequest
+ Stat stat = zks.getZKDatabase().statNode(path, existsRequest
.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
break;
@@ -239,7 +239,7 @@
GetDataRequest getDataRequest = new GetDataRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
getDataRequest);
- DataNode n = zks.dataTree.getNode(getDataRequest.getPath());
+ DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
@@ -247,11 +247,11 @@
synchronized(n) {
aclL = n.acl;
}
- PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(aclL),
+ PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
- byte b[] = zks.dataTree.getData(getDataRequest.getPath(), stat,
+ byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
@@ -263,7 +263,7 @@
request.request.rewind();
ZooKeeperServer.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
- zks.dataTree.setWatches(relativeZxid,
+ zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(), cnxn);
@@ -276,7 +276,7 @@
getACLRequest);
Stat stat = new Stat();
List<ACL> acl =
- zks.dataTree.getACL(getACLRequest.getPath(), stat);
+ zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
rsp = new GetACLResponse(acl, stat);
break;
}
@@ -285,18 +285,19 @@
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
getChildrenRequest);
- DataNode n = zks.dataTree.getNode(getChildrenRequest.getPath());
+ DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclG;
synchronized(n) {
aclG = n.acl;
+
}
- PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(aclG),
+ PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
ZooDefs.Perms.READ,
request.authInfo);
- List<String> children = zks.dataTree.getChildren(
+ List<String> children = zks.getZKDatabase().getChildren(
getChildrenRequest.getPath(), null, getChildrenRequest
.getWatch() ? cnxn : null);
rsp = new GetChildrenResponse(children);
@@ -308,7 +309,7 @@
ZooKeeperServer.byteBuffer2Record(request.request,
getChildren2Request);
Stat stat = new Stat();
- DataNode n = zks.dataTree.getNode(getChildren2Request.getPath());
+ DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
@@ -316,10 +317,10 @@
synchronized(n) {
aclG = n.acl;
}
- PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(aclG),
+ PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
ZooDefs.Perms.READ,
request.authInfo);
- List<String> children = zks.dataTree.getChildren(
+ List<String> children = zks.getZKDatabase().getChildren(
getChildren2Request.getPath(), stat, getChildren2Request
.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Fri Dec 18 02:19:50 2009
@@ -687,13 +687,13 @@
if (zk == null) {
throw new IOException("ZooKeeperServer not running");
}
- if (connReq.getLastZxidSeen() > zk.dataTree.lastProcessedZxid) {
+ if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
String msg = "Refusing session request for client "
+ sock.socket().getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(connReq.getLastZxidSeen())
+ " our last zxid is 0x"
- + Long.toHexString(zk.dataTree.lastProcessedZxid)
+ + Long.toHexString(zk.getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
LOG.info(msg);
@@ -800,7 +800,7 @@
sb.append("SessionTracker dump: \n");
sb.append(zk.sessionTracker.toString()).append("\n");
sb.append("ephemeral nodes dump:\n");
- sb.append(zk.dataTree.dumpEphemerals()).append("\n");
+ sb.append(zk.getZKDatabase().dumpEphemerals()).append("\n");
sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
}
k.interestOps(SelectionKey.OP_WRITE);
@@ -825,7 +825,7 @@
sb.append("\n");
}
sb.append(zk.serverStats().toString());
- sb.append("Node count: ").append(zk.dataTree.getNodeCount()).
+ sb.append("Node count: ").append(zk.getZKDatabase().getNodeCount()).
append("\n");
} else {
sb.append("ZooKeeperServer not running\n");
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Fri Dec 18 02:19:50 2009
@@ -132,7 +132,7 @@
}
*/
if (lastChange == null) {
- DataNode n = zks.dataTree.getNode(path);
+ DataNode n = zks.getZKDatabase().getNode(path);
if (n != null) {
Long acl;
Set<String> children;
@@ -142,7 +142,7 @@
}
lastChange = new ChangeRecord(-1, path, n.stat,
children != null ? children.size() : 0,
- zks.dataTree.convertLong(acl));
+ zks.getZKDatabase().convertLong(acl));
}
}
}
@@ -278,7 +278,7 @@
path = deleteRequest.getPath();
lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('\0') != -1
- || zks.dataTree.isSpecialPath(path)) {
+ || zks.getZKDatabase().isSpecialPath(path)) {
throw new KeeperException.BadArgumentsException(path);
}
parentPath = path.substring(0, lastSlash);
@@ -366,7 +366,7 @@
// queues up this operation without being the session owner.
// this request is the last of the session so it should be ok
//zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
- HashSet<String> es = zks.dataTree
+ HashSet<String> es = zks.getZKDatabase()
.getEphemerals(request.sessionId);
synchronized (zks.outstandingChanges) {
for (ChangeRecord c : zks.outstandingChanges) {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Fri Dec 18 02:19:50 2009
@@ -104,12 +104,12 @@
}
if (si != null) {
// track the number of records written to the log
- if (zks.getLogWriter().append(si)) {
+ if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
- zks.getLogWriter().rollLog();
+ zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
@@ -155,7 +155,7 @@
if (toFlush.isEmpty())
return;
- zks.getLogWriter().commit();
+ zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
nextProcessor.processRequest(i);
Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=892111&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Fri Dec 18 02:19:50 2009
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.QuorumPacket;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * This class maintains the in memory database of zookeeper
+ * server states that includes the sessions, datatree and the
+ * committed logs. It is booted up after reading the logs
+ * and snapshots from the disk.
+ */
+public class ZKDatabase {
+
+ private static final Logger LOG = Logger.getLogger(ZKDatabase.class);
+
+ /**
+ * make sure on a clear you take care of
+ * all these members.
+ */
+ protected DataTree dataTree;
+ protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
+ protected FileTxnSnapLog snapLog;
+ protected long minCommittedLog, maxCommittedLog;
+ public static final int commitLogCount = 500;
+ protected static int commitLogBuffer = 700;
+ protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
+ volatile private boolean initialized = false;
+
+ /**
+ * the filetxnsnaplog that this zk database
+ * maps to. There is a one to one relationship
+ * between a filetxnsnaplog and zkdatabase.
+ * @param snapLog the FileTxnSnapLog mapping this zkdatabase
+ */
+ public ZKDatabase(FileTxnSnapLog snapLog) {
+ dataTree = new DataTree();
+ sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
+ this.snapLog = snapLog;
+ }
+
+ /**
+ * checks to see if the zk database has been
+ * initialized or not.
+ * @return true if zk database is initialized and false if not
+ */
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ /**
+ * clear the zkdatabase.
+ * Note to developers - be careful to see that
+ * the clear method does clear out all the
+ * data structures in zkdatabase.
+ */
+ public void clear() {
+ minCommittedLog = 0;
+ maxCommittedLog = 0;
+ /* to be safe we just create a new
+ * datatree.
+ */
+ dataTree = new DataTree();
+ sessionsWithTimeouts.clear();
+ committedLog.clear();
+ initialized = false;
+ }
+
+ /**
+ * the datatree for this zkdatabase
+ * @return the datatree for this zkdatabase
+ */
+ public DataTree getDataTree() {
+ return this.dataTree;
+ }
+
+ /**
+ * the committed log for this zk database
+ * @return the committed log for this zkdatabase
+ */
+ public long getmaxCommittedLog() {
+ return maxCommittedLog;
+ }
+
+
+ /**
+ * the minimum committed transaction log
+ * available in memory
+ * @return the minimum committed transaction
+ * log available in memory
+ */
+ public long getminCommittedLog() {
+ return minCommittedLog;
+ }
+
+ public LinkedList<Proposal> getCommittedLog() {
+ return this.committedLog;
+ }
+
+ /**
+ * get the last processed zxid from a datatree
+ * @return the last processed zxid of a datatree
+ */
+ public long getDataTreeLastProcessedZxid() {
+ return dataTree.lastProcessedZxid;
+ }
+
+ /**
+ * set the datatree initialized or not
+ * @param b set the datatree initialized to b
+ */
+ public void setDataTreeInit(boolean b) {
+ dataTree.initialized = b;
+ }
+
+ /**
+ * return the sessions in the datatree
+ * @return the data tree sessions
+ */
+ public Collection<Long> getSessions() {
+ return dataTree.getSessions();
+ }
+
+ /**
+ * get sessions with timeouts
+ * @return the hashmap of sessions with timeouts
+ */
+ public ConcurrentHashMap<Long, Integer> getSessionWithTimeOuts() {
+ return sessionsWithTimeouts;
+ }
+
+
+ /**
+ * load the database from the disk onto memory and also add
+ * the transactions to the committedlog in memory.
+ * @return the last valid zxid on disk
+ * @throws IOException
+ */
+ public long loadDataBase() throws IOException {
+ PlayBackListener listener=new PlayBackListener(){
+ public void onTxnLoaded(TxnHeader hdr,Record txn){
+ Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
+ null, null);
+ r.txn = txn;
+ r.hdr = hdr;
+ r.zxid = hdr.getZxid();
+ addCommittedProposal(r);
+ }
+ };
+
+ long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
+ initialized = true;
+ return zxid;
+ }
+
+ /**
+ * maintains a list of last <i>committedLog</i>
+ * or so committed requests. This is used for
+ * fast follower synchronization.
+ * @param request committed request
+ */
+ public void addCommittedProposal(Request request) {
+ synchronized (committedLog) {
+ if (committedLog.size() > commitLogCount) {
+ committedLog.removeFirst();
+ minCommittedLog = committedLog.getFirst().packet.getZxid();
+ }
+ if (committedLog.size() == 0) {
+ minCommittedLog = request.zxid;
+ maxCommittedLog = request.zxid;
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ try {
+ request.hdr.serialize(boa, "hdr");
+ if (request.txn != null) {
+ request.txn.serialize(boa, "txn");
+ }
+ baos.close();
+ } catch (IOException e) {
+ LOG.error("This really should be impossible", e);
+ }
+ QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
+ baos.toByteArray(), null);
+ Proposal p = new Proposal();
+ p.packet = pp;
+ p.request = request;
+ committedLog.add(p);
+ maxCommittedLog = p.packet.getZxid();
+ }
+ }
+
+
+ /**
+ * remove a cnxn from the datatree
+ * @param cnxn the cnxn to remove from the datatree
+ */
+ public void removeCnxn(ServerCnxn cnxn) {
+ dataTree.removeCnxn(cnxn);
+ }
+
+ /**
+ * kill a given session in the datatree
+ * @param sessionId the session id to be killed
+ * @param zxid the zxid of kill session transaction
+ */
+ public void killSession(long sessionId, long zxid) {
+ dataTree.killSession(sessionId, zxid);
+ }
+
+ /**
+ * get a string dump of all the ephemerals in
+ * the datatree
+ * @return the string dump of ephemerals
+ */
+ public String dumpEphemerals() {
+ return dataTree.dumpEphemerals();
+ }
+
+ /**
+ * the node count of the datatree
+ * @return the node count of datatree
+ */
+ public int getNodeCount() {
+ return dataTree.getNodeCount();
+ }
+
+ /**
+ * the paths for ephemeral session id
+ * @param sessionId the session id for which paths match to
+ * @return the paths for a session id
+ */
+ public HashSet<String> getEphemerals(long sessionId) {
+ return dataTree.getEphemerals(sessionId);
+ }
+
+ /**
+ * the last processed zxid in the datatree
+ * @param zxid the last processed zxid in the datatree
+ */
+ public void setlastProcessedZxid(long zxid) {
+ dataTree.lastProcessedZxid = zxid;
+ }
+
+ /**
+ * the process txn on the data
+ * @param hdr the txnheader for the txn
+ * @param txn the transaction that needs to be processed
+ * @return the result of processing the transaction on this
+ * datatree/zkdatabase
+ */
+ public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+ return dataTree.processTxn(hdr, txn);
+ }
+
+ /**
+ * stat the path
+ * @param path the path for which stat is to be done
+ * @param serverCnxn the servercnxn attached to this request
+ * @return the stat of this node
+ * @throws KeeperException.NoNodeException
+ */
+ public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {
+ return dataTree.statNode(path, serverCnxn);
+ }
+
+ /**
+ * get the datanode for this path
+ * @param path the path to lookup
+ * @return the datanode for getting the path
+ */
+ public DataNode getNode(String path) {
+ return dataTree.getNode(path);
+ }
+
+ /**
+ * convert from long to the acl entry
+ * @param aclL the long for which to get the acl
+ * @return the acl corresponding to this long entry
+ */
+ public List<ACL> convertLong(Long aclL) {
+ return dataTree.convertLong(aclL);
+ }
+
+ /**
+ * get data and stat for a path
+ * @param path the path being queried
+ * @param stat the stat for this path
+ * @param watcher the watcher function
+ * @return
+ * @throws KeeperException.NoNodeException
+ */
+ public byte[] getData(String path, Stat stat, Watcher watcher)
+ throws KeeperException.NoNodeException {
+ return dataTree.getData(path, stat, watcher);
+ }
+
+ /**
+ * set watches on the datatree
+ * @param relativeZxid the relative zxid that client has seen
+ * @param dataWatches the data watches the client wants to reset
+ * @param existWatches the exists watches the client wants to reset
+ * @param childWatches the child watches the client wants to reset
+ * @param watcher the watcher function
+ */
+ public void setWatches(long relativeZxid, List<String> dataWatches,
+ List<String> existWatches, List<String> childWatches, Watcher watcher) {
+ dataTree.setWatches(relativeZxid, dataWatches, existWatches, childWatches, watcher);
+ }
+
+ /**
+ * get acl for a path
+ * @param path the path to query for acl
+ * @param stat the stat for the node
+ * @return the acl list for this path
+ * @throws NoNodeException
+ */
+ public List<ACL> getACL(String path, Stat stat) throws NoNodeException {
+ return dataTree.getACL(path, stat);
+ }
+
+ /**
+ * get children list for this path
+ * @param path the path of the node
+ * @param stat the stat of the node
+ * @param watcher the watcher function for this path
+ * @return the list of children for this path
+ * @throws KeeperException.NoNodeException
+ */
+ public List<String> getChildren(String path, Stat stat, Watcher watcher)
+ throws KeeperException.NoNodeException {
+ return dataTree.getChildren(path, stat, watcher);
+ }
+
+ /**
+ * check if the path is special or not
+ * @param path the input path
+ * @return true if path is special and false if not
+ */
+ public boolean isSpecialPath(String path) {
+ return dataTree.isSpecialPath(path);
+ }
+
+ /**
+ * get the acl size of the datatree
+ * @return the acl size of the datatree
+ */
+ public int getAclSize() {
+ return dataTree.longKeyMap.size();
+ }
+
+ /**
+ * truncate the zkdatabase to this zxid
+ * @param zxid the zxid to truncate zk database to
+ * @return true if the truncate is succesful and false if not
+ * @throws IOException
+ */
+ public boolean truncateLog(long zxid) throws IOException {
+ clear();
+ boolean truncated = this.snapLog.truncateLog(zxid);
+ loadDataBase();
+ return truncated;
+ }
+
+ /**
+ * deserialize a snapshot from an input archive
+ * @param ia the input archive you want to deserialize from
+ * @throws IOException
+ */
+ public void deserializeSnapshot(InputArchive ia) throws IOException {
+ clear();
+ SerializeUtils.deserializeSnapshot(getDataTree(),ia,getSessionWithTimeOuts());
+ initialized = true;
+ }
+
+ /**
+ * serialize the snapshot
+ * @param oa the output archive to which the snapshot needs to be serialized
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void serializeSnapshot(OutputArchive oa) throws IOException,
+ InterruptedException {
+ SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
+ }
+
+ /**
+ * append to the underlying transaction log
+ * @param si the request to append
+ * @return true if the append was succesfull and false if not
+ */
+ public boolean append(Request si) throws IOException {
+ return this.snapLog.append(si);
+ }
+
+ /**
+ * roll the underlying log
+ */
+ public void rollLog() throws IOException {
+ this.snapLog.rollLog();
+ }
+
+ /**
+ * commit to the underlying transaction log
+ * @throws IOException
+ */
+ public void commit() throws IOException {
+ this.snapLog.commit();
+ }
+
+
+}
\ No newline at end of file
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Fri Dec 18 02:19:50 2009
@@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -96,16 +97,10 @@
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
-
- public static final int commitLogCount = 500;
- public int commitLogBuffer = 700;
- public final LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
- public long minCommittedLog, maxCommittedLog;
- private DataTreeBuilder treeBuilder;
- public DataTree dataTree;
protected SessionTracker sessionTracker;
private FileTxnSnapLog txnLogFactory = null;
- protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
+ private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
+ private ZKDatabase zkDb;
protected long hzxid = 0;
public final static Exception ok = new Exception("No prob");
protected RequestProcessor firstProcessor;
@@ -128,7 +123,7 @@
private final ServerStats serverStats;
void removeCnxn(ServerCnxn cnxn) {
- dataTree.removeCnxn(cnxn);
+ zkDb.removeCnxn(cnxn);
}
/**
@@ -140,27 +135,38 @@
*/
public ZooKeeperServer() {
serverStats = new ServerStats(this);
- treeBuilder = new BasicDataTreeBuilder();
}
/**
* Creates a ZooKeeperServer instance. It sets everything up, but doesn't
* actually start listening for clients until run() is invoked.
- *
+ *
* @param dataDir the directory to put the data
* @throws IOException
*/
- public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
- DataTreeBuilder treeBuilder) throws IOException {
+ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
+ DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
serverStats = new ServerStats(this);
- this.treeBuilder = treeBuilder;
-
this.txnLogFactory = txnLogFactory;
+ this.zkDb = zkDb;
this.tickTime = tickTime;
- LOG.info("Created server");
+ LOG.info("Created server with tickTime " + tickTime + " datadir " +
+ txnLogFactory.getDataDir() + " snapdir " + txnLogFactory.getSnapDir());
}
+ /**
+ * creates a zookeeperserver instance.
+ * @param txnLogFactory the file transaction snapshot logging class
+ * @param tickTime the ticktime for the server
+ * @param treeBuilder the datatree builder
+ * @throws IOException
+ */
+ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
+ DataTreeBuilder treeBuilder) throws IOException {
+ this(txnLogFactory, tickTime, treeBuilder, new ZKDatabase(txnLogFactory));
+ }
+
public ServerStats serverStats() {
return serverStats;
}
@@ -172,7 +178,7 @@
*/
public ZooKeeperServer(File snapDir, File logDir, int tickTime)
throws IOException {
- this(new FileTxnSnapLog(snapDir,logDir),
+ this( new FileTxnSnapLog(snapDir, logDir),
tickTime,new BasicDataTreeBuilder());
}
@@ -182,84 +188,51 @@
* @throws IOException
*/
public ZooKeeperServer(FileTxnSnapLog txnLogFactory,DataTreeBuilder treeBuilder) throws IOException {
- this(txnLogFactory, DEFAULT_TICK_TIME, treeBuilder);
+ this(txnLogFactory, DEFAULT_TICK_TIME, treeBuilder, new ZKDatabase(txnLogFactory));
}
/**
+ * get the zookeeper database for this server
+ * @return the zookeeper database for this server
+ */
+ public ZKDatabase getZKDatabase() {
+ return this.zkDb;
+ }
+
+ /**
+ * set the zkdatabase for this zookeeper server
+ * @param zkDb
+ */
+ public void setZKDatabase(ZKDatabase zkDb) {
+ this.zkDb = zkDb;
+ }
+
+ /**
* Restore sessions and data
*/
public void loadData() throws IOException, InterruptedException {
- PlayBackListener listener=new PlayBackListener(){
- public void onTxnLoaded(TxnHeader hdr,Record txn){
- Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
- null, null);
- r.txn = txn;
- r.hdr = hdr;
- r.zxid = hdr.getZxid();
- addCommittedProposal(r);
- }
- };
- sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
- dataTree = treeBuilder.build();
- setZxid(txnLogFactory.restore(dataTree,sessionsWithTimeouts,listener));
+ zkDb.loadDataBase();
+ setZxid(zkDb.loadDataBase());
// Clean up dead sessions
LinkedList<Long> deadSessions = new LinkedList<Long>();
- for (long session : dataTree.getSessions()) {
+ for (long session : zkDb.getSessions()) {
+ sessionsWithTimeouts = zkDb.getSessionWithTimeOuts();
if (sessionsWithTimeouts.get(session) == null) {
deadSessions.add(session);
}
}
- dataTree.initialized = true;
+ zkDb.setDataTreeInit(true);
for (long session : deadSessions) {
// XXX: Is lastProcessedZxid really the best thing to use?
- killSession(session, dataTree.lastProcessedZxid);
+ killSession(session, zkDb.getDataTreeLastProcessedZxid());
}
// Make a clean snapshot
takeSnapshot();
}
- /**
- * maintains a list of last 500 or so committed requests. This is used for
- * fast follower synchronization.
- *
- * @param request committed request
- */
-
- public void addCommittedProposal(Request request) {
- synchronized (committedLog) {
- if (committedLog.size() > commitLogCount) {
- committedLog.removeFirst();
- minCommittedLog = committedLog.getFirst().packet.getZxid();
- }
- if (committedLog.size() == 0) {
- minCommittedLog = request.zxid;
- maxCommittedLog = request.zxid;
- }
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
- try {
- request.hdr.serialize(boa, "hdr");
- if (request.txn != null) {
- request.txn.serialize(boa, "txn");
- }
- baos.close();
- } catch (IOException e) {
- LOG.error("This really should be impossible", e);
- }
- QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
- baos.toByteArray(), null);
- Proposal p = new Proposal();
- p.packet = pp;
- p.request = request;
- committedLog.add(p);
- maxCommittedLog = p.packet.getZxid();
- }
- }
-
public void takeSnapshot(){
try {
- txnLogFactory.save(dataTree, sessionsWithTimeouts);
+ txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
} catch (IOException e) {
LOG.fatal("Severe unrecoverable error, exiting", e);
// This is a severe error that we cannot recover from,
@@ -268,18 +241,7 @@
}
}
- public void serializeSnapshot(OutputArchive oa) throws IOException,
- InterruptedException {
- SerializeUtils.serializeSnapshot(dataTree, oa, sessionsWithTimeouts);
- }
-
- public void deserializeSnapshot(InputArchive ia) throws IOException {
- sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
- dataTree = treeBuilder.build();
-
- SerializeUtils.deserializeSnapshot(dataTree,ia,sessionsWithTimeouts);
- }
-
+
/**
* This should be called from a synchronized block on this!
*/
@@ -312,7 +274,7 @@
}
protected void killSession(long sessionId, long zxid) {
- dataTree.killSession(sessionId, zxid);
+ zkDb.killSession(sessionId, zxid);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"ZooKeeperServer --- killSession: 0x"
@@ -358,7 +320,7 @@
MBeanRegistry.getInstance().register(jmxServerBean, null);
try {
- jmxDataTreeBean = new DataTreeBean(dataTree);
+ jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
@@ -371,7 +333,11 @@
}
public void startup() throws IOException, InterruptedException {
- if (dataTree == null) {
+ //check to see if zkDb is not null
+ if (zkDb == null) {
+ zkDb = new ZKDatabase(this.txnLogFactory);
+ }
+ if (!zkDb.isInitialized()) {
loadData();
}
createSessionTracker();
@@ -395,7 +361,7 @@
}
protected void createSessionTracker() {
- sessionTracker = new SessionTrackerImpl(this, sessionsWithTimeouts,
+ sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1);
((SessionTrackerImpl)sessionTracker).start();
}
@@ -416,8 +382,8 @@
if (firstProcessor != null) {
firstProcessor.shutdown();
}
- if (dataTree != null) {
- dataTree.clear();
+ if (zkDb != null) {
+ zkDb.clear();
}
unregisterJMX();
@@ -638,7 +604,7 @@
* datatree
*/
public long getLastProcessedZxid() {
- return dataTree.lastProcessedZxid;
+ return zkDb.getDataTreeLastProcessedZxid();
}
/**
@@ -658,17 +624,9 @@
* @throws IOException
*/
public void truncateLog(long zxid) throws IOException {
- this.txnLogFactory.truncateLog(zxid);
+ this.zkDb.truncateLog(zxid);
}
-
- /**
- * the snapshot and logwriter for this instance
- * @return
- */
- public FileTxnSnapLog getLogWriter() {
- return this.txnLogFactory;
- }
-
+
public int getTickTime() {
return tickTime;
}
@@ -677,14 +635,6 @@
this.tickTime = tickTime;
}
- public DataTreeBuilder getTreeBuilder() {
- return treeBuilder;
- }
-
- public void setTreeBuilder(DataTreeBuilder treeBuilder) {
- this.treeBuilder = treeBuilder;
- }
-
public int getClientPort() {
return serverCnxnFactory != null ? serverCnxnFactory.ss.socket().getLocalPort() : -1;
}
@@ -700,4 +650,6 @@
public String getState() {
return "standalone";
}
+
+
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java Fri Dec 18 02:19:50 2009
@@ -61,7 +61,7 @@
/**
* deserialize a data tree from the most recent snapshot
* @return the zxid of the snapshot
- */
+ */
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// we run through 100 snapshots (not all of them)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java Fri Dec 18 02:19:50 2009
@@ -28,6 +28,7 @@
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.txn.TxnHeader;
@@ -56,8 +57,8 @@
* @throws IOException
*/
FollowerZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self,
- DataTreeBuilder treeBuilder) throws IOException {
- super(logFactory, self.tickTime,treeBuilder);
+ DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
+ super(logFactory, self.tickTime,treeBuilder, zkDb);
this.self = self;
this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Fri Dec 18 02:19:50 2009
@@ -279,7 +279,7 @@
long epoch = self.getLastLoggedZxid() >> 32L;
epoch++;
zk.setZxid(epoch << 32L);
- zk.dataTree.lastProcessedZxid = zk.getZxid();
+ zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
synchronized(this){
lastProposed = zk.getZxid();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Fri Dec 18 02:19:50 2009
@@ -28,6 +28,7 @@
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.SessionTrackerImpl;
+import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -40,7 +41,7 @@
*/
public class LeaderZooKeeperServer extends ZooKeeperServer {
private QuorumPeer self;
-
+
CommitProcessor commitProcessor;
/**
@@ -49,8 +50,8 @@
* @throws IOException
*/
LeaderZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self,
- DataTreeBuilder treeBuilder) throws IOException {
- super(logFactory, self.tickTime,treeBuilder);
+ DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
+ super(logFactory, self.tickTime,treeBuilder, zkDb);
this.self = self;
}
@@ -80,7 +81,7 @@
@Override
protected void createSessionTracker() {
- sessionTracker = new SessionTrackerImpl(this, sessionsWithTimeouts,
+ sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
tickTime, self.getId());
((SessionTrackerImpl)sessionTracker).start();
}
@@ -94,7 +95,7 @@
protected void registerJMX() {
// register with JMX
try {
- jmxDataTreeBean = new DataTreeBean(dataTree);
+ jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Fri Dec 18 02:19:50 2009
@@ -280,12 +280,13 @@
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
- zk.loadData();
}
else if (qp.getType() == Leader.SNAP) {
LOG.info("Getting a snapshot from leader");
// The leader is going to dump the database
- zk.deserializeSnapshot(leaderIs);
+ // clear our own database and read
+ zk.getZKDatabase().clear();
+ zk.getZKDatabase().deserializeSnapshot(leaderIs);
String signature = leaderIs.readString("signature");
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got " + signature);
@@ -295,7 +296,7 @@
//we need to truncate the log to the lastzxid of the leader
LOG.warn("Truncating log to get in sync with the leader 0x"
+ Long.toHexString(qp.getZxid()));
- boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());
+ boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.fatal("Not able to truncate the log "
@@ -303,7 +304,6 @@
System.exit(13);
}
- zk.loadData();
}
else {
LOG.fatal("Got unexpected packet from leader "
@@ -311,7 +311,7 @@
System.exit(13);
}
- zk.dataTree.lastProcessedZxid = newLeaderZxid;
+ zk.getZKDatabase().setlastProcessedZxid(newLeaderZxid);
}
ack.setZxid(newLeaderZxid & ~0xffffffffL);
writePacket(ack, true);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Fri Dec 18 02:19:50 2009
@@ -27,6 +27,7 @@
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.BinaryInputArchive;
@@ -254,13 +255,14 @@
/* we are sending the diff check if we have proposals in memory to be able to
* send a diff to the
*/
- synchronized(leader.zk.committedLog) {
- if (leader.zk.committedLog.size() != 0) {
- if ((leader.zk.maxCommittedLog >= peerLastZxid)
- && (leader.zk.minCommittedLog <= peerLastZxid)) {
+ LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
+ synchronized(proposals) {
+ if (proposals.size() != 0) {
+ if ((leader.zk.getZKDatabase().getmaxCommittedLog() >= peerLastZxid)
+ && (leader.zk.getZKDatabase().getminCommittedLog() <= peerLastZxid)) {
packetToSend = Leader.DIFF;
- zxidToSend = leader.zk.maxCommittedLog;
- for (Proposal propose: leader.zk.committedLog) {
+ zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog();
+ for (Proposal propose: proposals) {
if (propose.packet.getZxid() > peerLastZxid) {
queuePacket(propose.packet);
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
@@ -274,7 +276,7 @@
else {
logTxns = false;
}
- }
+ }
//check if we decided to send a diff or we need to send a truncate
// we avoid using epochs for truncating because epochs make things
@@ -282,11 +284,11 @@
// only if we know that there is a committed zxid in the queue that
// is less than the one the peer has we send a trunc else to make
// things simple we just send sanpshot.
- if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {
+ if (logTxns && (peerLastZxid > leader.zk.getZKDatabase().getmaxCommittedLog())) {
// this is the only case that we are sure that
// we can ask the peer to truncate the log
packetToSend = Leader.TRUNC;
- zxidToSend = leader.zk.maxCommittedLog;
+ zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog();
updates = zxidToSend;
}
@@ -318,7 +320,7 @@
+ " zxid of leader is 0x"
+ Long.toHexString(leaderLastZxid));
// Dump data to peer
- leader.zk.serializeSnapshot(oa);
+ leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
}
bufferedOutput.flush();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java Fri Dec 18 02:19:50 2009
@@ -24,6 +24,7 @@
import org.apache.zookeeper.server.DataTreeBean;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerBean;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -36,8 +37,8 @@
protected QuorumPeer self;
public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
- DataTreeBuilder treeBuilder) throws IOException {
- super(logFactory,tickTime,treeBuilder);
+ DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
+ super(logFactory,tickTime,treeBuilder, zkDb);
}
/**
@@ -68,17 +69,9 @@
return self.getId();
}
- /**
- * Learners don't make use of this method, only Leaders.
- */
- @Override
- public void addCommittedProposal(Request request) {
- // Don't do anything!
- }
-
@Override
protected void createSessionTracker() {
- sessionTracker = new LearnerSessionTracker(this, sessionsWithTimeouts,
+ sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
self.getId());
}
@@ -92,7 +85,7 @@
protected void registerJMX() {
// register with JMX
try {
- jmxDataTreeBean = new DataTreeBean(dataTree);
+ jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java Fri Dec 18 02:19:50 2009
@@ -25,6 +25,7 @@
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
/**
@@ -48,8 +49,8 @@
new ConcurrentLinkedQueue<Request>();
ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
- DataTreeBuilder treeBuilder) throws IOException {
- super(logFactory, self.tickTime, treeBuilder);
+ DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
+ super(logFactory, self.tickTime, treeBuilder, zkDb);
this.self = self;
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Fri Dec 18 02:19:50 2009
@@ -33,7 +33,9 @@
import org.apache.log4j.Logger;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.persistence.Util;
@@ -73,7 +75,15 @@
QuorumBean jmxQuorumBean;
LocalPeerBean jmxLocalPeerBean;
LeaderElectionBean jmxLeaderElectionBean;
-
+
+ /* ZKDatabase is a top level member of quorumpeer
+ * which will be used in all the zookeeperservers
+ * instantiated later. Also, it is created once on
+ * bootup and only thrown away in case of a truncate
+ * message from the leader
+ */
+ private ZKDatabase zkDb;
+
/**
* Create an instance of a quorum peer
*/
@@ -351,6 +361,7 @@
this.initLimit = initLimit;
this.syncLimit = syncLimit;
this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
+ this.zkDb = new ZKDatabase(this.logFactory);
if(quorumConfig == null)
this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
else this.quorumConfig = quorumConfig;
@@ -362,6 +373,12 @@
@Override
public synchronized void start() {
+ try {
+ zkDb.loadDataBase();
+ } catch(IOException ie) {
+ LOG.fatal("Unable to load database on disk", ie);
+ throw new RuntimeException("Unable to run quorum server ", ie);
+ }
cnxnFactory.start();
startLeaderElection();
super.start();
@@ -447,27 +464,16 @@
* @return the highest zxid for this host
*/
public long getLastLoggedZxid() {
- /*
- * it is possible to have the last zxid with just a snapshot and no log
- * related to it. one example is during upgrade wherein the there is no
- * corresponding log to the snapshot. in that case just use the snapshot
- * zxid
- */
-
- File lastSnapshot = null;
- long maxZxid = -1L;
- long maxLogZxid = logFactory.getLastLoggedZxid();
+ long lastLogged= -1L;
try {
- lastSnapshot = logFactory.findMostRecentSnapshot();
- if (lastSnapshot != null) {
- maxZxid = Math.max(Util.getZxidFromName(lastSnapshot.getName(),
- "snapshot"), maxLogZxid);
- }
- } catch (IOException ie) {
- LOG.warn("Exception finding last snapshot ", ie);
- maxZxid = maxLogZxid;
+ if (!zkDb.isInitialized()) {
+ zkDb.loadDataBase();
+ }
+ lastLogged = zkDb.getDataTreeLastProcessedZxid();
+ } catch(IOException ie) {
+ LOG.warn("Unable to load database ", ie);
}
- return maxZxid;
+ return lastLogged;
}
public Follower follower;
@@ -476,17 +482,17 @@
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory,
- this,new ZooKeeperServer.BasicDataTreeBuilder()));
+ this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
return new Leader(this, new LeaderZooKeeperServer(logFactory,
- this,new ZooKeeperServer.BasicDataTreeBuilder()));
+ this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
return new Observer(this, new ObserverZooKeeperServer(logFactory,
- this, new ZooKeeperServer.BasicDataTreeBuilder()));
+ this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
private Election createElectionAlgorithm(int electionAlgorithm){
@@ -878,4 +884,12 @@
public FileTxnSnapLog getTxnFactory() {
return this.logFactory;
}
+
+ /**
+ * set zk database for this node
+ * @param database
+ */
+ public void setZKDatabase(ZKDatabase database) {
+ this.zkDb = database;
+ }
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Fri Dec 18 02:19:50 2009
@@ -25,6 +25,7 @@
import org.apache.log4j.Logger;
import org.apache.zookeeper.jmx.ManagedUtil;
import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
@@ -133,6 +134,7 @@
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
+ quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setPeerType(config.getPeerType());
quorumPeer.start();
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java Fri Dec 18 02:19:50 2009
@@ -106,7 +106,7 @@
zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
- assertTrue("size of the acl map ", (1 == zks.dataTree.longKeyMap.size()));
+ assertTrue("size of the acl map ", (1 == zks.getZKDatabase().getAclSize()));
for (int j = 100; j < 200; j++) {
path = "/" + j;
ACL acl = new ACL();
@@ -119,7 +119,7 @@
list.add(acl);
zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
}
- assertTrue("size of the acl map ", (101 == zks.dataTree.longKeyMap.size()));
+ assertTrue("size of the acl map ", (101 == zks.getZKDatabase().getAclSize()));
// now shutdown the server and restart it
f.shutdown();
assertTrue("waiting for server down",
@@ -138,7 +138,7 @@
TimeUnit.MILLISECONDS);
assertTrue("count == 0", startSignal.getCount() == 0);
- assertTrue("acl map ", (101 == zks.dataTree.longKeyMap.size()));
+ assertTrue("acl map ", (101 == zks.getZKDatabase().getAclSize()));
for (int j = 200; j < 205; j++) {
path = "/" + j;
ACL acl = new ACL();
@@ -151,7 +151,7 @@
list.add(acl);
zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
}
- assertTrue("acl map ", (106 == zks.dataTree.longKeyMap.size()));
+ assertTrue("acl map ", (106 == zks.getZKDatabase().getAclSize()));
zk.close();
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java?rev=892111&r1=892110&r2=892111&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java Fri Dec 18 02:19:50 2009
@@ -187,8 +187,8 @@
LOG.error("Null listener when initializing cnx manager");
}
- cnxManager.toSend(new Long(1), createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1));
+ cnxManager.toSend(new Long(1), createMsg(ServerState.LOOKING.ordinal(), 0, 0, 1));
cnxManager.recvQueue.take();
- cnxManager.toSend(new Long(1), createMsg(ServerState.FOLLOWING.ordinal(), 1, -1, 1));
+ cnxManager.toSend(new Long(1), createMsg(ServerState.FOLLOWING.ordinal(), 1, 0, 1));
}
}
\ No newline at end of file
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java?rev=892111&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ZkDatabaseCorruptionTest.java Fri Dec 18 02:19:50 2009
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.Before;
+
+
+public class ZkDatabaseCorruptionTest extends QuorumBase {
+ protected static final Logger LOG = Logger.getLogger(ZkDatabaseCorruptionTest.class);
+ public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+
+ private final QuorumBase qb = new QuorumBase();
+
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ qb.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ private void corruptFile(File f) throws IOException {
+ RandomAccessFile outFile = new RandomAccessFile(f, "rw");
+ outFile.write("fail servers".getBytes());
+ outFile.close();
+ }
+
+ private void corruptAllSnapshots(File snapDir) throws IOException {
+ File[] listFiles = snapDir.listFiles();
+ for (File f: listFiles) {
+ if (f.getName().startsWith("snapshot")) {
+ corruptFile(f);
+ }
+ }
+ }
+
+ public void testCorruption() throws Exception {
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+ public void process(WatchedEvent event) {
+ }});
+ SyncRequestProcessor.setSnapCount(100);
+ for (int i = 0; i < 2000; i++) {
+ zk.create("/0-" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ zk.close();
+ QuorumPeer leader;
+ //find out who is the leader and kill it
+ if ( qb.s5.getPeerState() != ServerState.LEADING) {
+ throw new Exception("the last server is not the leader");
+ }
+ leader = qb.s5;
+ // now corrupt the qurompeer database
+ FileTxnSnapLog snapLog = leader.getTxnFactory();
+ File snapDir= snapLog.getSnapDir();
+ //corrupt all the snapshot in the snapshot directory
+ corruptAllSnapshots(snapDir);
+ qb.shutdownServers();
+ qb.setupServers();
+ qb.s1.start();
+ qb.s2.start();
+ qb.s3.start();
+ qb.s4.start();
+ try {
+ qb.s5.start();
+ assertTrue(false);
+ } catch(RuntimeException re) {
+ LOG.info("Got an error: expected", re);
+ }
+ //waut for servers to be up
+ String[] list = qb.hostPort.split(",");
+ for (int i =0; i < 4; i++) {
+ String hp = list[i];
+ assertTrue("waiting for server up",
+ ClientBase.waitForServerUp(hp,
+ CONNECTION_TIMEOUT));
+ LOG.info(hp + " is accepting client connections");
+ }
+
+ zk = qb.createClient();
+ SyncRequestProcessor.setSnapCount(100);
+ for (int i = 2000; i < 4000; i++) {
+ zk.create("/0-" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ zk.close();
+ qb.s1.shutdown();
+ qb.s2.shutdown();
+ qb.s3.shutdown();
+ qb.s4.shutdown();
+ }
+
+
+}
\ No newline at end of file
|