Author: phunt
Date: Thu Oct 20 18:02:18 2011
New Revision: 1186967
URL: http://svn.apache.org/viewvc?rev=1186967&view=rev
Log:
ZOOKEEPER-1221. Provide accessors for Request.{hdr|txn} (Thomas Koch via phunt)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu Oct 20 18:02:18 2011
@@ -60,6 +60,8 @@ IMPROVEMENTS:
ZOOKEEPER-1193. Remove upgrade code (Thomas Koch via phunt)
+ ZOOKEEPER-1221. Provide accessors for Request.{hdr|txn} (Thomas Koch via phunt)
+
Release 3.4.0 -
Non-backward compatible changes:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Thu Oct 20 18:02:18 2011
@@ -106,17 +106,17 @@ public class FinalRequestProcessor imple
zks.outstandingChangesForPath.remove(cr.path);
}
}
- if (request.hdr != null) {
- rc = zks.getZKDatabase().processTxn(request.hdr, request.txn);
+ if (request.getHdr() != null) {
+ rc = zks.getZKDatabase().processTxn(request.getHdr(), request.getTxn());
if (request.type == OpCode.createSession) {
- if (request.txn instanceof CreateSessionTxn) {
- CreateSessionTxn cst = (CreateSessionTxn) request.txn;
+ if (request.getTxn() instanceof CreateSessionTxn) {
+ CreateSessionTxn cst = (CreateSessionTxn) request.getTxn();
zks.sessionTracker.addSession(request.sessionId, cst
.getTimeOut());
} else {
LOG.warn("*****>>>>> Got "
- + request.txn.getClass() + " "
- + request.txn.toString());
+ + request.getTxn().getClass() + " "
+ + request.getTxn().toString());
}
} else if (request.type == OpCode.closeSession) {
zks.sessionTracker.removeSession(request.sessionId);
@@ -128,7 +128,7 @@ public class FinalRequestProcessor imple
}
}
- if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
+ if (request.getHdr() != null && request.getHdr().getType() == OpCode.closeSession) {
ServerCnxnFactory scxn = zks.getServerCnxnFactory();
// this might be possible since
// we might just be playing diffs from the leader
@@ -153,9 +153,9 @@ public class FinalRequestProcessor imple
Record rsp = null;
boolean closeSession = false;
try {
- if (request.hdr != null && request.hdr.getType() == OpCode.error) {
+ if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
throw KeeperException.create(KeeperException.Code.get((
- (ErrorTxn) request.txn).getErr()));
+ (ErrorTxn) request.getTxn()).getErr()));
}
KeeperException ke = request.getException();
@@ -308,8 +308,8 @@ public class FinalRequestProcessor imple
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
- zks.getZKDatabase().setWatches(relativeZxid,
- setWatches.getDataWatches(),
+ zks.getZKDatabase().setWatches(relativeZxid,
+ setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(), cnxn);
break;
@@ -320,7 +320,7 @@ public class FinalRequestProcessor imple
ByteBufferInputStream.byteBuffer2Record(request.request,
getACLRequest);
Stat stat = new Stat();
- List<ACL> acl =
+ List<ACL> acl =
zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
rsp = new GetACLResponse(acl, stat);
break;
@@ -337,9 +337,9 @@ public class FinalRequestProcessor imple
Long aclG;
synchronized(n) {
aclG = n.acl;
-
+
}
- PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
+ PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
ZooDefs.Perms.READ,
request.authInfo);
List<String> children = zks.getZKDatabase().getChildren(
@@ -362,7 +362,7 @@ public class FinalRequestProcessor imple
synchronized(n) {
aclG = n.acl;
}
- PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
+ PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
ZooDefs.Perms.READ,
request.authInfo);
List<String> children = zks.getZKDatabase().getChildren(
@@ -377,7 +377,7 @@ public class FinalRequestProcessor imple
// down the connection otw ZOOKEEPER-710 might happen
// ie client on slow follower starts to renew session, fails
// before this completes, then tries the fast follower (leader)
- // and is successful, however the initial renew is then
+ // and is successful, however the initial renew is then
// successfully fwd/processed by the leader and as a result
// the client and leader disagree on where the client is most
// recently attached (and therefore invalid SESSION MOVED generated)
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Thu Oct 20 18:02:18 2011
@@ -290,7 +290,7 @@ public class PrepRequestProcessor extend
* @param record
*/
protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException {
- request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type);
+ request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type));
switch (type) {
case OpCode.create:
@@ -335,17 +335,17 @@ public class PrepRequestProcessor extend
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
int newCversion = parentRecord.stat.getCversion()+1;
- request.txn = new CreateTxn(path, createRequest.getData(), listACL,
- createMode.isEphemeral(), newCversion);
+ request.setTxn(new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(),
+ newCversion));
StatPersisted s = new StatPersisted();
if (createMode.isEphemeral()) {
s.setEphemeralOwner(request.sessionId);
}
- parentRecord = parentRecord.duplicate(request.hdr.getZxid());
+ parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
addChangeRecord(parentRecord);
- addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s, 0, listACL));
+ addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
break;
case OpCode.delete:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
@@ -364,11 +364,11 @@ public class PrepRequestProcessor extend
if (nodeRecord.childCount > 0) {
throw new KeeperException.NotEmptyException(path);
}
- request.txn = new DeleteTxn(path);
- parentRecord = parentRecord.duplicate(request.hdr.getZxid());
+ request.setTxn(new DeleteTxn(path));
+ parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount--;
addChangeRecord(parentRecord);
- addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, null, -1, null));
+ addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null));
break;
case OpCode.setData:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
@@ -377,8 +377,8 @@ public class PrepRequestProcessor extend
nodeRecord = getRecordForPath(path);
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
- request.txn = new SetDataTxn(path, setDataRequest.getData(), newVersion);
- nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
+ request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
+ nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
nodeRecord.stat.setVersion(newVersion);
addChangeRecord(nodeRecord);
break;
@@ -393,15 +393,15 @@ public class PrepRequestProcessor extend
nodeRecord = getRecordForPath(path);
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.ADMIN, request.authInfo);
newVersion = checkAndIncVersion(nodeRecord.stat.getAversion(), setAclRequest.getVersion(), path);
- request.txn = new SetACLTxn(path, listACL, newVersion);
- nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
+ request.setTxn(new SetACLTxn(path, listACL, newVersion));
+ nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
nodeRecord.stat.setAversion(newVersion);
addChangeRecord(nodeRecord);
break;
case OpCode.createSession:
request.request.rewind();
int to = request.request.getInt();
- request.txn = new CreateSessionTxn(to);
+ request.setTxn(new CreateSessionTxn(to));
request.request.rewind();
zks.sessionTracker.addSession(request.sessionId, to);
zks.setOwner(request.sessionId, request.getOwner());
@@ -423,8 +423,7 @@ public class PrepRequestProcessor extend
}
}
for (String path2Delete : es) {
- addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
- path2Delete, null, 0, null));
+ addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null));
}
}
LOG.info("Processed session termination for sessionid: 0x"
@@ -436,8 +435,8 @@ public class PrepRequestProcessor extend
path = checkVersionRequest.getPath();
nodeRecord = getRecordForPath(path);
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.READ, request.authInfo);
- request.txn = new CheckVersionTxn(path,
- checkAndIncVersion(nodeRecord.stat.getVersion(), checkVersionRequest.getVersion(), path));
+ request.setTxn(new CheckVersionTxn(path, checkAndIncVersion(nodeRecord.stat.getVersion(),
+ checkVersionRequest.getVersion(), path)));
break;
}
}
@@ -459,8 +458,8 @@ public class PrepRequestProcessor extend
protected void pRequest(Request request) {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
- request.hdr = null;
- request.txn = null;
+ request.setHdr(null);
+ request.setTxn(null);
try {
switch (request.type) {
@@ -509,8 +508,8 @@ public class PrepRequestProcessor extend
* would be confusing in the logfiles.
*/
if (ke != null) {
- request.hdr.setType(OpCode.error);
- request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
+ request.getHdr().setType(OpCode.error);
+ request.setTxn(new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue()));
}
/* Prep the request and convert to a Txn */
@@ -521,8 +520,8 @@ public class PrepRequestProcessor extend
if (ke == null) {
ke = e;
}
- request.hdr.setType(OpCode.error);
- request.txn = new ErrorTxn(e.code().intValue());
+ request.getHdr().setType(OpCode.error);
+ request.setTxn(new ErrorTxn(e.code().intValue()));
LOG.error(">>>> Got user-level KeeperException when processing "
+ request.toString()
+ " Error Path:" + e.getPath()
@@ -540,14 +539,14 @@ public class PrepRequestProcessor extend
// not sure how else to get the txn stored into our list.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
- request.txn.serialize(boa, "request") ;
+ request.getTxn().serialize(boa, "request") ;
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
- txns.add(new Txn(request.hdr.getType(), bb.array()));
+ txns.add(new Txn(request.getHdr().getType(), bb.array()));
}
- request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type);
- request.txn = new MultiTxn(txns);
+ request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type));
+ request.setTxn(new MultiTxn(txns));
break;
@@ -571,9 +570,9 @@ public class PrepRequestProcessor extend
break;
}
} catch (KeeperException e) {
- if (request.hdr != null) {
- request.hdr.setType(OpCode.error);
- request.txn = new ErrorTxn(e.code().intValue());
+ if (request.getHdr() != null) {
+ request.getHdr().setType(OpCode.error);
+ request.setTxn(new ErrorTxn(e.code().intValue()));
}
LOG.info("Got user-level KeeperException when processing "
+ request.toString()
@@ -597,9 +596,9 @@ public class PrepRequestProcessor extend
}
LOG.error("Dumping request buffer: 0x" + sb.toString());
- if (request.hdr != null) {
- request.hdr.setType(OpCode.error);
- request.txn = new ErrorTxn(Code.MARSHALLINGERROR.intValue());
+ if (request.getHdr() != null) {
+ request.getHdr().setType(OpCode.error);
+ request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
request.zxid = zks.getZxid();
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java Thu Oct 20 18:02:18 2011
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.jute.Record;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Id;
@@ -35,18 +33,9 @@ import org.apache.zookeeper.txn.TxnHeade
* onto the request as it is processed.
*/
public class Request {
- public final static Request requestOfDeath = new Request(null, 0, 0, 0,
- null, null);
+ public final static Request requestOfDeath = new Request(null, 0, 0, 0, null, null);
- /**
- * @param cnxn
- * @param sessionId
- * @param xid
- * @param type
- * @param bb
- */
- public Request(ServerCnxn cnxn, long sessionId, int xid, int type,
- ByteBuffer bb, List<Id> authInfo) {
+ public Request(ServerCnxn cnxn, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
this.cnxn = cnxn;
this.sessionId = sessionId;
this.cxid = xid;
@@ -55,6 +44,18 @@ public class Request {
this.authInfo = authInfo;
}
+ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, long zxid) {
+ this.sessionId = sessionId;
+ this.cxid = xid;
+ this.type = type;
+ this.hdr = hdr;
+ this.txn = txn;
+ this.zxid = zxid;
+ this.request = null;
+ this.cnxn = null;
+ this.authInfo = null;
+ }
+
public final long sessionId;
public final int cxid;
@@ -65,31 +66,47 @@ public class Request {
public final ServerCnxn cnxn;
- public TxnHeader hdr;
+ private TxnHeader hdr;
- public Record txn;
+ private Record txn;
public long zxid = -1;
public final List<Id> authInfo;
public final long createTime = System.currentTimeMillis();
-
+
private Object owner;
-
+
private KeeperException e;
public Object getOwner() {
return owner;
}
-
+
public void setOwner(Object owner) {
this.owner = owner;
}
+ public TxnHeader getHdr() {
+ return hdr;
+ }
+
+ public void setHdr(TxnHeader hdr) {
+ this.hdr = hdr;
+ }
+
+ public Record getTxn() {
+ return txn;
+ }
+
+ public void setTxn(Record txn) {
+ this.txn = txn;
+ }
+
/**
* is the packet type a valid packet in zookeeper
- *
+ *
* @param type
* the type of the packet
* @return true if a valid packet, false if not
@@ -143,7 +160,7 @@ public class Request {
return false;
}
}
-
+
static String op2String(int op) {
switch (op) {
case OpCode.notification:
@@ -232,7 +249,7 @@ public class Request {
public void setException(KeeperException e) {
this.e = e;
}
-
+
public KeeperException getException() {
return e;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Thu Oct 20 18:02:18 2011
@@ -203,11 +203,7 @@ public class ZKDatabase {
public long loadDataBase() throws IOException {
PlayBackListener listener=new PlayBackListener(){
public void onTxnLoaded(TxnHeader hdr,Record txn){
- Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
- null, null);
- r.txn = txn;
- r.hdr = hdr;
- r.zxid = hdr.getZxid();
+ Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, txn, hdr.getZxid());
addCommittedProposal(r);
}
};
@@ -239,9 +235,9 @@ public class ZKDatabase {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
- request.hdr.serialize(boa, "hdr");
- if (request.txn != null) {
- request.txn.serialize(boa, "txn");
+ request.getHdr().serialize(boa, "hdr");
+ if (request.getTxn() != null) {
+ request.getTxn().serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java Thu Oct 20 18:02:18 2011
@@ -40,38 +40,38 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This is a helper class
- * above the implementations
- * of txnlog and snapshot
+ * This is a helper class
+ * above the implementations
+ * of txnlog and snapshot
* classes
*/
public class FileTxnSnapLog {
- //the direcotry containing the
+ //the direcotry containing the
//the transaction logs
- File dataDir;
- //the directory containing the
+ File dataDir;
+ //the directory containing the
//the snapshot directory
File snapDir;
TxnLog txnLog;
SnapShot snapLog;
public final static int VERSION = 2;
public final static String version = "version-";
-
+
private static final Logger LOG = LoggerFactory.getLogger(FileTxnSnapLog.class);
-
+
/**
* This listener helps
* the external apis calling
* restore to gather information
- * while the data is being
+ * while the data is being
* restored.
*/
public interface PlayBackListener {
void onTxnLoaded(TxnHeader hdr, Record rec);
}
-
+
/**
- * the constructor which takes the datadir and
+ * the constructor which takes the datadir and
* snapdir.
* @param dataDir the trasaction directory
* @param snapDir the snapshot directory
@@ -94,7 +94,7 @@ public class FileTxnSnapLog {
txnLog = new FileTxnLog(this.dataDir);
snapLog = new FileSnap(this.snapDir);
}
-
+
/**
* get the datadir used by this filetxn
* snap log
@@ -103,28 +103,28 @@ public class FileTxnSnapLog {
public File getDataDir() {
return this.dataDir;
}
-
+
/**
- * get the snap dir used by this
+ * get the snap dir used by this
* filetxn snap log
* @return the snap dir
*/
public File getSnapDir() {
return this.snapDir;
}
-
+
/**
- * this function restores the server
- * database after reading from the
+ * this function restores the server
+ * database after reading from the
* snapshots and transaction logs
* @param dt the datatree to be restored
* @param sessions the sessions to be restored
- * @param listener the playback listener to run on the
+ * @param listener the playback listener to run on the
* database restoration
* @return the highest zxid restored
* @throws IOException
*/
- public long restore(DataTree dt, Map<Long, Integer> sessions,
+ public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
@@ -132,11 +132,11 @@ public class FileTxnSnapLog {
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
while (true) {
- // iterator points to
+ // iterator points to
// the first valid txn when initialized
hdr = itr.getHeader();
if (hdr == null) {
- //empty logs
+ //empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
@@ -153,12 +153,12 @@ public class FileTxnSnapLog {
hdr.getType() + " error: " + e.getMessage());
}
listener.onTxnLoaded(hdr, itr.getTxn());
- if (!itr.next())
+ if (!itr.next())
break;
}
return highestZxid;
}
-
+
/**
* process the transaction on the datatree
* @param hdr the hdr of the transaction
@@ -231,7 +231,7 @@ public class FileTxnSnapLog {
" : error: " + rc.err);
}
}
-
+
/**
* the last logged zxid on the transaction logs
* @return the last logged zxid
@@ -256,7 +256,7 @@ public class FileTxnSnapLog {
File snapshot=new File(
snapDir, Util.makeSnapshotName(lastZxid));
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshot);
-
+
}
/**
@@ -270,11 +270,11 @@ public class FileTxnSnapLog {
FileTxnLog txnLog = new FileTxnLog(dataDir);
return txnLog.truncate(zxid);
}
-
+
/**
* the most recent snapshot in the snapshot
* directory
- * @return the file that contains the most
+ * @return the file that contains the most
* recent snapshot
* @throws IOException
*/
@@ -282,7 +282,7 @@ public class FileTxnSnapLog {
FileSnap snaplog = new FileSnap(snapDir);
return snaplog.findMostRecentSnapshot();
}
-
+
/**
* the n most recent snapshots
* @param n the number of recent snapshots
@@ -297,8 +297,8 @@ public class FileTxnSnapLog {
/**
* get the snapshot logs that are greater than
- * the given zxid
- * @param zxid the zxid that contains logs greater than
+ * the given zxid
+ * @param zxid the zxid that contains logs greater than
* zxid
* @return
*/
@@ -309,11 +309,11 @@ public class FileTxnSnapLog {
/**
* append the request to the transaction logs
* @param si the request to be appended
- * returns true iff something appended, otw false
+ * returns true iff something appended, otw false
* @throws IOException
*/
public boolean append(Request si) throws IOException {
- return txnLog.append(si.hdr, si.txn);
+ return txnLog.append(si.getHdr(), si.getTxn());
}
/**
@@ -326,12 +326,12 @@ public class FileTxnSnapLog {
/**
* roll the transaction logs
- * @throws IOException
+ * @throws IOException
*/
public void rollLog() throws IOException {
txnLog.rollLog();
}
-
+
/**
* close the transaction log files
* @throws IOException
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Thu Oct 20 18:02:18 2011
@@ -47,7 +47,6 @@ public class CommitProcessor extends Thr
LinkedList<Request> committedRequests = new LinkedList<Request>();
RequestProcessor nextProcessor;
- ArrayList<Request> toProcess = new ArrayList<Request>();
/**
* This flag indicates whether we need to wait for a response to come back from the
@@ -66,24 +65,21 @@ public class CommitProcessor extends Thr
@Override
public void run() {
+ ArrayList<Request> toProcess = new ArrayList<Request>();
try {
Request nextPending = null;
while (!finished) {
- int len = toProcess.size();
- for (int i = 0; i < len; i++) {
- nextProcessor.processRequest(toProcess.get(i));
+ for (Request request : toProcess) {
+ nextProcessor.processRequest(request);
}
toProcess.clear();
synchronized (this) {
- if ((queuedRequests.size() == 0 || nextPending != null)
- && committedRequests.size() == 0) {
+ if ((queuedRequests.isEmpty() || nextPending != null) && committedRequests.isEmpty()) {
wait();
continue;
}
- // First check and see if the commit came in for the pending
- // request
- if ((queuedRequests.size() == 0 || nextPending != null)
- && committedRequests.size() > 0) {
+ // First check and see if the commit came in for the pending request
+ if ((queuedRequests.isEmpty() || nextPending != null) && !committedRequests.isEmpty()) {
Request r = committedRequests.remove();
/*
* We match with nextPending so that we can move to the
@@ -96,28 +92,26 @@ public class CommitProcessor extends Thr
&& nextPending.cxid == r.cxid) {
// we want to send our version of the request.
// the pointer to the connection in the request
- nextPending.hdr = r.hdr;
- nextPending.txn = r.txn;
+ nextPending.setHdr(r.getHdr());
+ nextPending.setTxn(r.getTxn());
nextPending.zxid = r.zxid;
toProcess.add(nextPending);
nextPending = null;
} else {
- // this request came from someone else so just
- // send the commit packet
+ // this request came from someone else so just send the commit packet
toProcess.add(r);
}
}
}
- // We haven't matched the pending requests, so go back to
- // waiting
+ // We haven't matched the pending requests, so go back to waiting
if (nextPending != null) {
continue;
}
synchronized (this) {
// Process the next requests in the queuedRequests
- while (nextPending == null && queuedRequests.size() > 0) {
+ while (nextPending == null && !queuedRequests.isEmpty()) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
@@ -166,7 +160,6 @@ public class CommitProcessor extends Thr
}
synchronized public void processRequest(Request request) {
- // request.addRQRec(">commit");
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
@@ -188,5 +181,4 @@ public class CommitProcessor extends Thr
nextProcessor.shutdown();
}
}
-
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java Thu Oct 20 18:02:18 2011
@@ -37,7 +37,7 @@ import org.apache.zookeeper.txn.TxnHeade
* Just like the standard ZooKeeperServer. We just replace the request
* processors: FollowerRequestProcessor -> CommitProcessor ->
* FinalRequestProcessor
- *
+ *
* A SyncRequestProcessor is also spawned off to log proposals from the leader.
*/
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
@@ -52,7 +52,7 @@ public class FollowerZooKeeperServer ext
* Pending sync requests
*/
ConcurrentLinkedQueue<Request> pendingSyncs;
-
+
/**
* @param port
* @param dataDir
@@ -67,7 +67,7 @@ public class FollowerZooKeeperServer ext
public Follower getFollower(){
return self.follower;
- }
+ }
@Override
protected void setupRequestProcessors() {
@@ -85,11 +85,7 @@ public class FollowerZooKeeperServer ext
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
public void logRequest(TxnHeader hdr, Record txn) {
- Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
- hdr.getType(), null, null);
- request.hdr = hdr;
- request.txn = txn;
- request.zxid = hdr.getZxid();
+ Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
@@ -97,7 +93,7 @@ public class FollowerZooKeeperServer ext
}
/**
- * When a COMMIT message is received, eventually this method is called,
+ * When a COMMIT message is received, eventually this method is called,
* which matches up the zxid from the COMMIT with (hopefully) the head of
* the pendingTxns queue and hands it to the commitProcessor to commit.
* @param zxid - must correspond to the head of pendingTxns if it exists
@@ -118,22 +114,22 @@ public class FollowerZooKeeperServer ext
Request request = pendingTxns.remove();
commitProcessor.commit(request);
}
-
+
synchronized public void sync(){
if(pendingSyncs.size() ==0){
LOG.warn("Not expecting a sync.");
return;
}
-
+
Request r = pendingSyncs.remove();
commitProcessor.commit(r);
}
-
+
@Override
public int getGlobalOutstandingLimit() {
return super.getGlobalOutstandingLimit() / (self.getQuorumSize() - 1);
}
-
+
@Override
public void shutdown() {
LOG.info("Shutting down");
@@ -151,7 +147,7 @@ public class FollowerZooKeeperServer ext
e);
}
}
-
+
@Override
public String getState() {
return "follower";
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Oct 20 18:02:18 2011
@@ -51,7 +51,7 @@ import org.apache.zookeeper.server.util.
*/
public class Leader {
private static final Logger LOG = LoggerFactory.getLogger(Leader.class);
-
+
static final private boolean nodelay = System.getProperty("leader.nodelay", "true").equals("true");
static {
LOG.info("TCP NoDelay set to: " + nodelay);
@@ -76,27 +76,27 @@ public class Leader {
// the follower acceptor thread
LearnerCnxAcceptor cnxAcceptor;
-
+
// list of all the followers
public final HashSet<LearnerHandler> learners =
new HashSet<LearnerHandler>();
- // list of followers that are ready to follow (i.e synced with the leader)
+ // list of followers that are ready to follow (i.e synced with the leader)
public final HashSet<LearnerHandler> forwardingFollowers =
new HashSet<LearnerHandler>();
-
+
protected final HashSet<LearnerHandler> observingLearners =
new HashSet<LearnerHandler>();
-
+
//Pending sync requests
public final HashMap<Long,List<LearnerSyncRequest>> pendingSyncs =
new HashMap<Long,List<LearnerSyncRequest>>();
-
+
//Follower counter
final AtomicLong followerCounter = new AtomicLong(-1);
/**
* Adds peer to the leader.
- *
+ *
* @param learner
* instance of learner handle
*/
@@ -108,13 +108,13 @@ public class Leader {
/**
* Remove the learner from the learner list
- *
+ *
* @param peer
*/
void removeLearnerHandler(LearnerHandler peer) {
synchronized (forwardingFollowers) {
- forwardingFollowers.remove(peer);
- }
+ forwardingFollowers.remove(peer);
+ }
synchronized (learners) {
learners.remove(peer);
}
@@ -123,9 +123,9 @@ public class Leader {
boolean isLearnerSynced(LearnerHandler peer){
synchronized (forwardingFollowers) {
return forwardingFollowers.contains(peer);
- }
+ }
}
-
+
ServerSocket ss;
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
@@ -144,22 +144,22 @@ public class Leader {
* This message is for follower to expect diff
*/
final static int DIFF = 13;
-
+
/**
- * This is for follower to truncate its logs
+ * This is for follower to truncate its logs
*/
final static int TRUNC = 14;
-
+
/**
* This is for follower to download the snapshots
*/
final static int SNAP = 15;
-
+
/**
* This tells the leader that the connecting peer is actually an observer
*/
final static int OBSERVERINFO = 16;
-
+
/**
* This message type is sent by the leader to indicate it's zxid and if
* needed, its database.
@@ -188,7 +188,7 @@ public class Leader {
* This message is used by the follow to ack a proposed epoch.
*/
public static final int ACKEPOCH = 18;
-
+
/**
* This message type is sent to a leader to request and mutation operation.
* The payload will consist of a request header followed by a request.
@@ -227,27 +227,27 @@ public class Leader {
* between the leader and the follower.
*/
final static int SYNC = 7;
-
+
/**
* This message type informs observers of a committed proposal.
*/
final static int INFORM = 8;
- ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
+ final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
+
+ private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
- ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
+ private final Proposal newLeaderProposal = new Proposal();
- Proposal newLeaderProposal = new Proposal();
-
class LearnerCnxAcceptor extends Thread{
private volatile boolean stop = false;
-
+
@Override
public void run() {
try {
while (!stop) {
try{
- Socket s = ss.accept();
+ Socket s = ss.accept();
s.setSoTimeout(self.tickTime * self.syncLimit);
s.setTcpNoDelay(nodelay);
LearnerHandler fh = new LearnerHandler(s, Leader.this);
@@ -270,21 +270,21 @@ public class Leader {
LOG.warn("Exception while accepting follower", e);
}
}
-
+
public void halt() {
stop = true;
}
}
StateSummary leaderStateSummary;
-
+
long epoch = -1;
boolean waitingForNewEpoch = true;
volatile boolean readyToStart = false;
-
+
/**
* This method is main function that is called to lead
- *
+ *
* @throws IOException
* @throws InterruptedException
*/
@@ -300,26 +300,26 @@ public class Leader {
try {
self.tick = 0;
zk.loadData();
-
+
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
- // Start thread that waits for connection requests from
+ // Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
-
+
readyToStart = true;
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
self.setAcceptedEpoch(epoch);
-
+
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
-
+
/*
synchronized(this){
lastProposed = zk.getZxid();
}
*/
-
+
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);
@@ -330,7 +330,7 @@ public class Leader {
}
outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
newLeaderProposal.ackSet.add(self.getId());
-
+
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
@@ -345,12 +345,12 @@ public class Leader {
StringBuilder ackToString = new StringBuilder();
for(Long id : newLeaderProposal.ackSet)
ackToString.append(id + ": ");
-
+
shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
HashSet<Long> followerSet = new HashSet<Long>();
for(LearnerHandler f : learners)
followerSet.add(f.getSid());
-
+
if (self.getQuorumVerifier().containsQuorum(followerSet)) {
//if (followers.size() >= self.quorumPeers.size() / 2) {
LOG.warn("Enough followers present. "+
@@ -361,7 +361,7 @@ public class Leader {
Thread.sleep(self.tickTime);
self.tick++;
}
-
+
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.cnxnFactory.setZooKeeperServer(zk);
}
@@ -375,7 +375,7 @@ public class Leader {
// We ping twice a tick, so we only update the tick every other
// iteration
boolean tickSkip = true;
-
+
while (true) {
Thread.sleep(self.tickTime / 2);
if (!tickSkip) {
@@ -383,7 +383,7 @@ public class Leader {
}
int syncedCount = 0;
HashSet<Long> syncedSet = new HashSet<Long>();
-
+
// lock on the followers when we use it.
syncedSet.add(self.getId());
synchronized (learners) {
@@ -404,7 +404,7 @@ public class Leader {
// make sure the order is the same!
// the leader goes to looking
return;
- }
+ }
tickSkip = !tickSkip;
}
} finally {
@@ -423,14 +423,14 @@ public class Leader {
if (isShutdown) {
return;
}
-
+
LOG.info("Shutdown called",
new Exception("shutdown Leader! reason: " + reason));
if (cnxAcceptor != null) {
cnxAcceptor.halt();
}
-
+
// NIO should not accept conenctions
self.cnxnFactory.setZooKeeperServer(null);
try {
@@ -458,7 +458,7 @@ public class Leader {
/**
* Keep a count of acks that are received by the leader for a particular
* proposal
- *
+ *
* @param zxid
* the zxid of the proposal sent out
* @param followerAddr
@@ -473,7 +473,7 @@ public class Leader {
}
LOG.trace("outstanding proposals all");
}
-
+
if (outstandingProposals.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("outstanding is 0");
@@ -495,13 +495,13 @@ public class Leader {
+ Long.toHexString(zxid) + " from " + followerAddr);
return;
}
-
+
p.ackSet.add(sid);
if (LOG.isDebugEnabled()) {
LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
+ " is " + p.ackSet.size());
}
- if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
+ if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
+ " from " + followerAddr + " not first!");
@@ -538,21 +538,20 @@ public class Leader {
}
static class ToBeAppliedRequestProcessor implements RequestProcessor {
- private RequestProcessor next;
+ private final RequestProcessor next;
- private ConcurrentLinkedQueue<Proposal> toBeApplied;
+ private final Leader leader;
/**
* This request processor simply maintains the toBeApplied list. For
* this to work next must be a FinalRequestProcessor and
* FinalRequestProcessor.processRequest MUST process the request
* synchronously!
- *
+ *
* @param next
* a reference to the FinalRequestProcessor
*/
- ToBeAppliedRequestProcessor(RequestProcessor next,
- ConcurrentLinkedQueue<Proposal> toBeApplied) {
+ ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
if (!(next instanceof FinalRequestProcessor)) {
throw new RuntimeException(ToBeAppliedRequestProcessor.class
.getName()
@@ -561,28 +560,26 @@ public class Leader {
+ " not "
+ next.getClass().getName());
}
- this.toBeApplied = toBeApplied;
+ this.leader = leader;
this.next = next;
}
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
*/
public void processRequest(Request request) {
- // request.addRQRec(">tobe");
next.processRequest(request);
- Proposal p = toBeApplied.peek();
- if (p != null && p.request != null
- && p.request.zxid == request.zxid) {
- toBeApplied.remove();
+ Proposal p = leader.toBeApplied.peek();
+ if (p != null && p.request != null && p.request.zxid == request.zxid) {
+ leader.toBeApplied.remove();
}
}
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.zookeeper.server.RequestProcessor#shutdown()
*/
public void shutdown() {
@@ -593,24 +590,24 @@ public class Leader {
/**
* send a packet to all the followers ready to follow
- *
+ *
* @param qp
* the packet to be sent
*/
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
- for (LearnerHandler f : forwardingFollowers) {
+ for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
}
}
-
+
/**
- * send a packet to all observers
+ * send a packet to all observers
*/
- void sendObserverPacket(QuorumPacket qp) {
+ void sendObserverPacket(QuorumPacket qp) {
synchronized(observingLearners) {
- for (LearnerHandler f : observingLearners) {
+ for (LearnerHandler f : observingLearners) {
f.queuePacket(qp);
}
}
@@ -620,7 +617,7 @@ public class Leader {
/**
* Create a commit packet and send it to all the members of the quorum
- *
+ *
* @param zxid
*/
public void commit(long zxid) {
@@ -630,33 +627,33 @@ public class Leader {
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp);
}
-
+
/**
* Create an inform packet and send it to all observers.
* @param zxid
* @param proposal
*/
- public void inform(Proposal proposal) {
- QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,
+ public void inform(Proposal proposal) {
+ QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,
proposal.packet.getData(), null);
sendObserverPacket(qp);
}
long lastProposed;
-
+
/**
* Returns the current epoch of the leader.
- *
+ *
* @return
*/
public long getEpoch(){
return ZxidUtils.getEpochFromZxid(lastProposed);
}
-
+
/**
* create a proposal and send it out to all the members
- *
+ *
* @param request
* @return the proposal that is queued to send to all the members
*/
@@ -664,17 +661,17 @@ public class Leader {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
- request.hdr.serialize(boa, "hdr");
- if (request.txn != null) {
- request.txn.serialize(boa, "txn");
+ request.getHdr().serialize(boa, "hdr");
+ if (request.getTxn() != null) {
+ request.getTxn().serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
LOG.warn("This really should be impossible", e);
}
- QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
+ QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
-
+
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
@@ -689,13 +686,13 @@ public class Leader {
}
return p;
}
-
+
/**
* Process sync requests
- *
+ *
* @param r the request
*/
-
+
synchronized public void processSync(LearnerSyncRequest r){
if(outstandingProposals.isEmpty()){
sendSync(r);
@@ -708,23 +705,23 @@ public class Leader {
pendingSyncs.put(lastProposed, l);
}
}
-
+
/**
* Sends a sync message to the appropriate server
- *
+ *
* @param f
* @param r
*/
-
+
public void sendSync(LearnerSyncRequest r){
QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
r.fh.queuePacket(qp);
}
-
+
/**
* lets the leader know that a follower is capable of following and is done
* syncing
- *
+ *
* @param handler handler of the follower
* @return last proposed zxid
*/
@@ -762,11 +759,11 @@ public class Leader {
observingLearners.add(handler);
}
}
-
+
return lastProposed;
}
- private HashSet<Long> connectingFollowers = new HashSet<Long>();
+ private final HashSet<Long> connectingFollowers = new HashSet<Long>();
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException {
synchronized(connectingFollowers) {
if (!waitingForNewEpoch) {
@@ -789,14 +786,14 @@ public class Leader {
cur = System.currentTimeMillis();
}
if (waitingForNewEpoch) {
- throw new InterruptedException("Timeout while waiting for epoch from quorum");
+ throw new InterruptedException("Timeout while waiting for epoch from quorum");
}
}
return epoch;
}
}
- private HashSet<Long> electingFollowers = new HashSet<Long>();
+ private final HashSet<Long> electingFollowers = new HashSet<Long>();
private boolean electionFinished = false;
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
synchronized(electingFollowers) {
@@ -813,7 +810,7 @@ public class Leader {
if (readyToStart && verifier.containsQuorum(electingFollowers)) {
electionFinished = true;
electingFollowers.notifyAll();
- } else {
+ } else {
long start = System.currentTimeMillis();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Thu Oct 20 18:02:18 2011
@@ -32,7 +32,7 @@ import org.apache.zookeeper.server.ZKDat
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
/**
- *
+ *
* Just like the standard ZooKeeperServer. We just replace the request
* processors: PrepRequestProcessor -> ProposalRequestProcessor ->
* CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->
@@ -55,12 +55,11 @@ public class LeaderZooKeeperServer exten
public Leader getLeader(){
return self.leader;
}
-
+
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
- finalProcessor, getLeader().toBeApplied);
+ RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false);
commitProcessor.start();
@@ -75,7 +74,7 @@ public class LeaderZooKeeperServer exten
public int getGlobalOutstandingLimit() {
return super.getGlobalOutstandingLimit() / (self.getQuorumSize() - 1);
}
-
+
@Override
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
@@ -146,7 +145,7 @@ public class LeaderZooKeeperServer exten
}
jmxServerBean = null;
}
-
+
@Override
public String getState() {
return "leader";
@@ -154,13 +153,13 @@ public class LeaderZooKeeperServer exten
/**
* Returns the id of the associated QuorumPeer, which will do for a unique
- * id of this server.
+ * id of this server.
*/
@Override
public long getServerId() {
return self.getId();
- }
-
+ }
+
@Override
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java Thu Oct 20 18:02:18 2011
@@ -32,11 +32,11 @@ import org.apache.zookeeper.txn.TxnHeade
* Instead, they are informed of successful proposals by the Leader. Observers
* therefore naturally act as a relay point for publishing the proposal stream
* and can relieve Followers of some of the connection load. Observers may
- * submit proposals, but do not vote in their acceptance.
+ * submit proposals, but do not vote in their acceptance.
*
- * See ZOOKEEPER-368 for a discussion of this feature.
+ * See ZOOKEEPER-368 for a discussion of this feature.
*/
-public class Observer extends Learner{
+public class Observer extends Learner{
Observer(QuorumPeer self,ObserverZooKeeperServer observerZooKeeperServer) {
this.self = self;
@@ -46,12 +46,12 @@ public class Observer extends Learner{
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("Observer ").append(sock);
+ sb.append("Observer ").append(sock);
sb.append(" pendingRevalidationCount:")
.append(pendingRevalidations.size());
return sb.toString();
}
-
+
/**
* the main method called by the observer to observe the leader
*
@@ -66,12 +66,12 @@ public class Observer extends Learner{
try {
connectToLeader(addr);
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
-
+
syncWithLeader(newLeaderZxid);
QuorumPacket qp = new QuorumPacket();
while (self.isRunning()) {
readPacket(qp);
- processPacket(qp);
+ processPacket(qp);
}
} catch (IOException e) {
LOG.warn("Exception when observing the leader", e);
@@ -80,7 +80,7 @@ public class Observer extends Learner{
} catch (IOException e1) {
e1.printStackTrace();
}
-
+
// clear pending revalidations
pendingRevalidations.clear();
}
@@ -88,7 +88,7 @@ public class Observer extends Learner{
zk.unregisterJMX(this);
}
}
-
+
/**
* Controls the response of an observer to the receipt of a quorumpacket
* @param qp
@@ -103,8 +103,8 @@ public class Observer extends Learner{
LOG.warn("Ignoring proposal");
break;
case Leader.COMMIT:
- LOG.warn("Ignoring commit");
- break;
+ LOG.warn("Ignoring commit");
+ break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Observer started");
break;
@@ -114,16 +114,12 @@ public class Observer extends Learner{
case Leader.SYNC:
((ObserverZooKeeperServer)zk).sync();
break;
- case Leader.INFORM:
+ case Leader.INFORM:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
- Request request = new Request (null, hdr.getClientId(),
- hdr.getCxid(),
- hdr.getType(), null, null);
- request.txn = txn;
- request.hdr = hdr;
+ Request request = new Request (hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
- obs.commitRequest(request);
+ obs.commitRequest(request);
break;
}
}
@@ -131,7 +127,7 @@ public class Observer extends Learner{
/**
* Shutdown the Observer.
*/
- public void shutdown() {
+ public void shutdown() {
LOG.info("shutdown called", new Exception("shutdown Observer"));
super.shutdown();
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java Thu Oct 20 18:02:18 2011
@@ -33,7 +33,7 @@ public class ProposalRequestProcessor im
LoggerFactory.getLogger(ProposalRequestProcessor.class);
LeaderZooKeeperServer zks;
-
+
RequestProcessor nextProcessor;
SyncRequestProcessor syncProcessor;
@@ -45,33 +45,33 @@ public class ProposalRequestProcessor im
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
}
-
+
/**
* initialize this processor
*/
public void initialize() {
syncProcessor.start();
}
-
+
public void processRequest(Request request) {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop");
-
-
- /* In the following IF-THEN-ELSE block, we process syncs on the leader.
+
+
+ /* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
- * the leader that issued the sync command, then syncHandler won't
- * contain the handler. In this case, we add it to syncHandler, and
+ * the leader that issued the sync command, then syncHandler won't
+ * contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/
-
+
if(request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
nextProcessor.processRequest(request);
- if (request.hdr != null) {
+ if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
zks.getLeader().propose(request);
syncProcessor.processRequest(request);
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java Thu Oct 20 18:02:18 2011
@@ -30,7 +30,7 @@ import org.apache.zookeeper.server.Reque
public class SendAckRequestProcessor implements RequestProcessor, Flushable {
private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);
-
+
Learner learner;
SendAckRequestProcessor(Learner peer) {
@@ -39,7 +39,7 @@ public class SendAckRequestProcessor imp
public void processRequest(Request si) {
if(si.type != OpCode.sync){
- QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
+ QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,
null);
try {
learner.writePacket(qp, false);
@@ -56,7 +56,7 @@ public class SendAckRequestProcessor imp
}
}
}
-
+
public void flush() throws IOException {
try {
learner.writePacket(null, true);
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=1186967&r1=1186966&r2=1186967&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Thu Oct 20 18:02:18 2011
@@ -168,7 +168,7 @@ public class QuorumPeerMainTest extends
// just make sure that we actually did get it in process at the
// leader
Assert.assertTrue(outstanding.size() == 1);
- Assert.assertTrue(((Proposal) outstanding.values().iterator().next()).request.hdr.getType() == OpCode.create);
+ Assert.assertTrue(((Proposal) outstanding.values().iterator().next()).request.getHdr().getType() == OpCode.create);
// make sure it has a chance to write it to disk
Thread.sleep(1000);
mt[leader].shutdown();
|