Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Tue Jun 24 12:04:58 2008
@@ -1,369 +1,369 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.InputArchive;
-import org.apache.jute.OutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooTrace;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.txn.TxnHeader;
-
-/**
- * This class has the control logic for the Follower.
- */
-public class Follower {
- private static final Logger LOG = Logger.getLogger(Follower.class);
-
- QuorumPeer self;
-
- FollowerZooKeeperServer zk;
-
- Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
- this.self = self;
- this.zk=zk;
- }
-
- private InputArchive leaderIs;
-
- private OutputArchive leaderOs;
-
- private BufferedOutputStream bufferedOutput;
-
- public Socket sock;
-
- /**
- * write a packet to the leader
- *
- * @param pp
- * the proposal packet to be sent to the leader
- * @throws IOException
- */
- void writePacket(QuorumPacket pp) throws IOException {
- long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
- if (pp.getType() == Leader.PING) {
- traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
- }
- ZooTrace.logQuorumPacket(LOG, traceMask, 'o', pp);
- synchronized (leaderOs) {
- leaderOs.writeRecord(pp, "packet");
- bufferedOutput.flush();
- }
- }
-
- /**
- * read a packet from the leader
- *
- * @param pp
- * the packet to be instantiated
- * @throws IOException
- */
- void readPacket(QuorumPacket pp) throws IOException {
- synchronized (leaderIs) {
- leaderIs.readRecord(pp, "packet");
- }
- long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
- if (pp.getType() == Leader.PING) {
- traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
- }
- ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
- }
-
- /**
- * the main method called by the follower to follow the leader
- *
- * @throws InterruptedException
- */
- void followLeader() throws InterruptedException {
- InetSocketAddress addr = null;
- // Find the leader by id
- for (QuorumServer s : self.quorumPeers) {
- if (s.id == self.currentVote.id) {
- addr = s.addr;
- break;
- }
- }
- if (addr == null) {
- LOG.warn("Couldn't find the leader with id = "
- + self.currentVote.id);
- }
- LOG.info("Following " + addr);
- sock = new Socket();
- try {
- QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
- sock.setSoTimeout(self.tickTime * self.initLimit);
- for (int tries = 0; tries < 5; tries++) {
- try {
- //sock = new Socket();
- //sock.setSoTimeout(self.tickTime * self.initLimit);
- sock.connect(addr, self.tickTime * self.syncLimit);
- sock.setTcpNoDelay(true);
- break;
- } catch (ConnectException e) {
- if (tries == 4) {
- LOG.error("Unexpected exception",e);
- throw e;
- } else {
- LOG.warn("Unexpected exception",e);
- sock = new Socket();
- sock.setSoTimeout(self.tickTime * self.initLimit);
- }
- }
- Thread.sleep(1000);
- }
- leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
- sock.getInputStream()));
- bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
- leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
- QuorumPacket qp = new QuorumPacket();
- qp.setType(Leader.LASTZXID);
- long sentLastZxid = self.getLastLoggedZxid();
- qp.setZxid(sentLastZxid);
- writePacket(qp);
- readPacket(qp);
- long newLeaderZxid = qp.getZxid();
-
- if (qp.getType() != Leader.NEWLEADER) {
- LOG.error("First packet should have been NEWLEADER");
- throw new IOException("First packet should have been NEWLEADER");
- }
- readPacket(qp);
- synchronized (zk) {
- if (qp.getType() == Leader.DIFF) {
- LOG.info("Getting a diff from the leader!");
- zk.loadData();
- }
- else if (qp.getType() == Leader.SNAP) {
- LOG.info("Getting a snapshot from leader");
- // The leader is going to dump the database
- zk.loadData(leaderIs);
- String signature = leaderIs.readString("signature");
- if (!signature.equals("BenWasHere")) {
- LOG.error("Missing signature. Got " + signature);
- throw new IOException("Missing signature");
- }
- } else if (qp.getType() == Leader.TRUNC) {
- //we need to truncate the log to the lastzxid of the leader
- LOG.warn("Truncating log to get in sync with the leader "
- + Long.toHexString(qp.getZxid()));
- zk.truncateLog(qp.getZxid());
- zk.loadData();
- }
- else {
- LOG.error("Got unexpected packet from leader "
- + qp.getType() + " exiting ... " );
- System.exit(13);
- }
- zk.dataTree.lastProcessedZxid = newLeaderZxid;
- }
- ack.setZxid(newLeaderZxid & ~0xffffffffL);
- writePacket(ack);
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- zk.startup();
- while (self.running) {
- readPacket(qp);
- switch (qp.getType()) {
- case Leader.PING:
- // Send back the ping with our session data
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- HashMap<Long, Integer> touchTable = ((FollowerZooKeeperServer) zk)
- .getTouchSnapshot();
- for (Entry<Long, Integer> entry : touchTable.entrySet()) {
- dos.writeLong(entry.getKey());
- dos.writeInt(entry.getValue());
- }
- qp.setData(bos.toByteArray());
- writePacket(qp);
- break;
- case Leader.PROPOSAL:
- TxnHeader hdr = new TxnHeader();
- BinaryInputArchive ia = BinaryInputArchive
- .getArchive(new ByteArrayInputStream(qp.getData()));
- Record txn = ZooKeeperServer.deserializeTxn(ia, hdr);
- if (hdr.getZxid() != lastQueued + 1) {
- LOG.warn("Got zxid "
- + Long.toHexString(hdr.getZxid())
- + " expected "
- + Long.toHexString(lastQueued + 1));
- }
- lastQueued = hdr.getZxid();
- zk.logRequest(hdr, txn);
- break;
- case Leader.COMMIT:
- zk.commit(qp.getZxid());
- break;
- case Leader.UPTODATE:
- zk.snapshot();
- self.cnxnFactory.setZooKeeperServer(zk);
- break;
- case Leader.REVALIDATE:
- ByteArrayInputStream bis = new ByteArrayInputStream(qp
- .getData());
- DataInputStream dis = new DataInputStream(bis);
- long sessionId = dis.readLong();
- boolean valid = dis.readBoolean();
- synchronized (pendingRevalidations) {
- ServerCnxn cnxn = pendingRevalidations
- .remove(sessionId);
- if (cnxn == null) {
- LOG.warn("Missing "
- + Long.toHexString(sessionId)
- + " for validation");
- } else {
- cnxn.finishSessionInit(valid);
- }
- }
- ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
- "Session " + sessionId
- + " is valid: " + valid);
- break;
- case Leader.SYNC:
- zk.sync();
- break;
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- try {
- sock.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
-
- synchronized (pendingRevalidations) {
- // clear pending revalitions
- pendingRevalidations.clear();
- pendingRevalidations.notifyAll();
- }
- }
- }
-
- private long lastQueued;
-
- ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
-
- /**
- * validate a seesion for a client
- *
- * @param clientId
- * the client to be revailidated
- * @param timeout
- * the timeout for which the session is valid
- * @return
- * @throws IOException
- * @throws InterruptedException
- */
- void validateSession(ServerCnxn cnxn, long clientId, int timeout)
- throws IOException, InterruptedException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- dos.writeLong(clientId);
- dos.writeInt(timeout);
- dos.close();
- QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
- .toByteArray(), null);
- pendingRevalidations.put(clientId, cnxn);
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "To validate session "
- + Long.toHexString(clientId));
- writePacket(qp);
- }
-
- /**
- * send a request packet to the leader
- *
- * @param request
- * the request from the client
- * @throws IOException
- */
- void request(Request request) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream oa = new DataOutputStream(baos);
- oa.writeLong(request.sessionId);
- oa.writeInt(request.cxid);
- oa.writeInt(request.type);
- if (request.request != null) {
- request.request.rewind();
- int len = request.request.remaining();
- byte b[] = new byte[len];
- request.request.get(b);
- request.request.rewind();
- oa.write(b);
- }
- oa.close();
- QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
- .toByteArray(), request.authInfo);
-// QuorumPacket qp;
-// if(request.type == OpCode.sync){
-// qp = new QuorumPacket(Leader.SYNC, -1, baos
-// .toByteArray(), request.authInfo);
-// }
-// else{
-// qp = new QuorumPacket(Leader.REQUEST, -1, baos
-// .toByteArray(), request.authInfo);
-// }
- writePacket(qp);
- }
-
- public long getZxid() {
- try {
- synchronized (zk) {
- return zk.getZxid();
- }
- } catch (NullPointerException e) {
- }
- return -1;
- }
-
- public void shutdown() {
- // set the zookeeper server to null
- self.cnxnFactory.setZooKeeperServer(null);
- // clear all the connections
- self.cnxnFactory.clear();
- // shutdown previous zookeeper
- if (zk != null) {
- zk.shutdown();
-
- }
- LOG.error("FIXMSG",new Exception("shutdown Follower"));
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * This class has the control logic for the Follower.
+ */
+public class Follower {
+ private static final Logger LOG = Logger.getLogger(Follower.class);
+
+ QuorumPeer self;
+
+ FollowerZooKeeperServer zk;
+
+ Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
+ this.self = self;
+ this.zk=zk;
+ }
+
+ private InputArchive leaderIs;
+
+ private OutputArchive leaderOs;
+
+ private BufferedOutputStream bufferedOutput;
+
+ public Socket sock;
+
+ /**
+ * write a packet to the leader
+ *
+ * @param pp
+ * the proposal packet to be sent to the leader
+ * @throws IOException
+ */
+ void writePacket(QuorumPacket pp) throws IOException {
+ long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+ if (pp.getType() == Leader.PING) {
+ traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+ }
+ ZooTrace.logQuorumPacket(LOG, traceMask, 'o', pp);
+ synchronized (leaderOs) {
+ leaderOs.writeRecord(pp, "packet");
+ bufferedOutput.flush();
+ }
+ }
+
+ /**
+ * read a packet from the leader
+ *
+ * @param pp
+ * the packet to be instantiated
+ * @throws IOException
+ */
+ void readPacket(QuorumPacket pp) throws IOException {
+ synchronized (leaderIs) {
+ leaderIs.readRecord(pp, "packet");
+ }
+ long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+ if (pp.getType() == Leader.PING) {
+ traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+ }
+ ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
+ }
+
+ /**
+ * the main method called by the follower to follow the leader
+ *
+ * @throws InterruptedException
+ */
+ void followLeader() throws InterruptedException {
+ InetSocketAddress addr = null;
+ // Find the leader by id
+ for (QuorumServer s : self.quorumPeers) {
+ if (s.id == self.currentVote.id) {
+ addr = s.addr;
+ break;
+ }
+ }
+ if (addr == null) {
+ LOG.warn("Couldn't find the leader with id = "
+ + self.currentVote.id);
+ }
+ LOG.info("Following " + addr);
+ sock = new Socket();
+ try {
+ QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
+ sock.setSoTimeout(self.tickTime * self.initLimit);
+ for (int tries = 0; tries < 5; tries++) {
+ try {
+ //sock = new Socket();
+ //sock.setSoTimeout(self.tickTime * self.initLimit);
+ sock.connect(addr, self.tickTime * self.syncLimit);
+ sock.setTcpNoDelay(true);
+ break;
+ } catch (ConnectException e) {
+ if (tries == 4) {
+ LOG.error("Unexpected exception",e);
+ throw e;
+ } else {
+ LOG.warn("Unexpected exception",e);
+ sock = new Socket();
+ sock.setSoTimeout(self.tickTime * self.initLimit);
+ }
+ }
+ Thread.sleep(1000);
+ }
+ leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
+ sock.getInputStream()));
+ bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
+ leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
+ QuorumPacket qp = new QuorumPacket();
+ qp.setType(Leader.LASTZXID);
+ long sentLastZxid = self.getLastLoggedZxid();
+ qp.setZxid(sentLastZxid);
+ writePacket(qp);
+ readPacket(qp);
+ long newLeaderZxid = qp.getZxid();
+
+ if (qp.getType() != Leader.NEWLEADER) {
+ LOG.error("First packet should have been NEWLEADER");
+ throw new IOException("First packet should have been NEWLEADER");
+ }
+ readPacket(qp);
+ synchronized (zk) {
+ if (qp.getType() == Leader.DIFF) {
+ LOG.info("Getting a diff from the leader!");
+ zk.loadData();
+ }
+ else if (qp.getType() == Leader.SNAP) {
+ LOG.info("Getting a snapshot from leader");
+ // The leader is going to dump the database
+ zk.loadData(leaderIs);
+ String signature = leaderIs.readString("signature");
+ if (!signature.equals("BenWasHere")) {
+ LOG.error("Missing signature. Got " + signature);
+ throw new IOException("Missing signature");
+ }
+ } else if (qp.getType() == Leader.TRUNC) {
+ //we need to truncate the log to the lastzxid of the leader
+ LOG.warn("Truncating log to get in sync with the leader "
+ + Long.toHexString(qp.getZxid()));
+ zk.truncateLog(qp.getZxid());
+ zk.loadData();
+ }
+ else {
+ LOG.error("Got unexpected packet from leader "
+ + qp.getType() + " exiting ... " );
+ System.exit(13);
+ }
+ zk.dataTree.lastProcessedZxid = newLeaderZxid;
+ }
+ ack.setZxid(newLeaderZxid & ~0xffffffffL);
+ writePacket(ack);
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
+ zk.startup();
+ while (self.running) {
+ readPacket(qp);
+ switch (qp.getType()) {
+ case Leader.PING:
+ // Send back the ping with our session data
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ HashMap<Long, Integer> touchTable = ((FollowerZooKeeperServer) zk)
+ .getTouchSnapshot();
+ for (Entry<Long, Integer> entry : touchTable.entrySet()) {
+ dos.writeLong(entry.getKey());
+ dos.writeInt(entry.getValue());
+ }
+ qp.setData(bos.toByteArray());
+ writePacket(qp);
+ break;
+ case Leader.PROPOSAL:
+ TxnHeader hdr = new TxnHeader();
+ BinaryInputArchive ia = BinaryInputArchive
+ .getArchive(new ByteArrayInputStream(qp.getData()));
+ Record txn = ZooKeeperServer.deserializeTxn(ia, hdr);
+ if (hdr.getZxid() != lastQueued + 1) {
+ LOG.warn("Got zxid "
+ + Long.toHexString(hdr.getZxid())
+ + " expected "
+ + Long.toHexString(lastQueued + 1));
+ }
+ lastQueued = hdr.getZxid();
+ zk.logRequest(hdr, txn);
+ break;
+ case Leader.COMMIT:
+ zk.commit(qp.getZxid());
+ break;
+ case Leader.UPTODATE:
+ zk.snapshot();
+ self.cnxnFactory.setZooKeeperServer(zk);
+ break;
+ case Leader.REVALIDATE:
+ ByteArrayInputStream bis = new ByteArrayInputStream(qp
+ .getData());
+ DataInputStream dis = new DataInputStream(bis);
+ long sessionId = dis.readLong();
+ boolean valid = dis.readBoolean();
+ synchronized (pendingRevalidations) {
+ ServerCnxn cnxn = pendingRevalidations
+ .remove(sessionId);
+ if (cnxn == null) {
+ LOG.warn("Missing "
+ + Long.toHexString(sessionId)
+ + " for validation");
+ } else {
+ cnxn.finishSessionInit(valid);
+ }
+ }
+ ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
+ "Session " + sessionId
+ + " is valid: " + valid);
+ break;
+ case Leader.SYNC:
+ zk.sync();
+ break;
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ try {
+ sock.close();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+
+ synchronized (pendingRevalidations) {
+ // clear pending revalitions
+ pendingRevalidations.clear();
+ pendingRevalidations.notifyAll();
+ }
+ }
+ }
+
+ private long lastQueued;
+
+ ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
+
+ /**
+ * validate a seesion for a client
+ *
+ * @param clientId
+ * the client to be revailidated
+ * @param timeout
+ * the timeout for which the session is valid
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void validateSession(ServerCnxn cnxn, long clientId, int timeout)
+ throws IOException, InterruptedException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeLong(clientId);
+ dos.writeInt(timeout);
+ dos.close();
+ QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
+ .toByteArray(), null);
+ pendingRevalidations.put(clientId, cnxn);
+ ZooTrace.logTraceMessage(LOG,
+ ZooTrace.SESSION_TRACE_MASK,
+ "To validate session "
+ + Long.toHexString(clientId));
+ writePacket(qp);
+ }
+
+ /**
+ * send a request packet to the leader
+ *
+ * @param request
+ * the request from the client
+ * @throws IOException
+ */
+ void request(Request request) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream oa = new DataOutputStream(baos);
+ oa.writeLong(request.sessionId);
+ oa.writeInt(request.cxid);
+ oa.writeInt(request.type);
+ if (request.request != null) {
+ request.request.rewind();
+ int len = request.request.remaining();
+ byte b[] = new byte[len];
+ request.request.get(b);
+ request.request.rewind();
+ oa.write(b);
+ }
+ oa.close();
+ QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
+ .toByteArray(), request.authInfo);
+// QuorumPacket qp;
+// if(request.type == OpCode.sync){
+// qp = new QuorumPacket(Leader.SYNC, -1, baos
+// .toByteArray(), request.authInfo);
+// }
+// else{
+// qp = new QuorumPacket(Leader.REQUEST, -1, baos
+// .toByteArray(), request.authInfo);
+// }
+ writePacket(qp);
+ }
+
+ public long getZxid() {
+ try {
+ synchronized (zk) {
+ return zk.getZxid();
+ }
+ } catch (NullPointerException e) {
+ }
+ return -1;
+ }
+
+ public void shutdown() {
+ // set the zookeeper server to null
+ self.cnxnFactory.setZooKeeperServer(null);
+ // clear all the connections
+ self.cnxnFactory.clear();
+ // shutdown previous zookeeper
+ if (zk != null) {
+ zk.shutdown();
+
+ }
+ LOG.error("FIXMSG",new Exception("shutdown Follower"));
+ }
+}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=671303&r1=671302&r2=671303&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java Tue Jun 24 12:04:58 2008
@@ -1,391 +1,391 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooTrace;
-import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.txn.TxnHeader;
-
-/**
- * There will be an instance of this class created by the Leader for each
- * follower.All communication for a given Follower will be handled by this
- * class.
- */
-public class FollowerHandler extends Thread {
- private static final Logger LOG = Logger.getLogger(FollowerHandler.class);
-
- public Socket s;
-
- Leader leader;
-
- long tickOfLastAck;
-
- /**
- * The packets to be sent to the follower
- */
- LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
-
- private BinaryInputArchive ia;
-
- private BinaryOutputArchive oa;
-
- private BufferedOutputStream bufferedOutput;
-
- FollowerHandler(Socket s, Leader leader) throws IOException {
- super("FollowerHandler-" + s.getRemoteSocketAddress());
- this.s = s;
- this.leader = leader;
- leader.addFollowerHandler(this);
- start();
- }
-
- /**
- * If this packet is queued, the sender thread will exit
- */
- QuorumPacket proposalOfDeath = new QuorumPacket();
-
- /**
- * This method will use the thread to send packets added to the
- * queuedPackets list
- *
- * @throws InterruptedException
- */
- private void sendPackets() throws InterruptedException {
- long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
- while (true) {
- QuorumPacket p;
- p = queuedPackets.take();
-
- if (p == proposalOfDeath) {
- // Packet of death!
- break;
- }
- if (p.getType() == Leader.PING) {
- traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
- }
- ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
- try {
- oa.writeRecord(p, "packet");
- bufferedOutput.flush();
- } catch (IOException e) {
- if (!s.isClosed()) {
- LOG.warn("Unexpected exception",e);
- }
- break;
- }
- }
- }
-
- static public String packetToString(QuorumPacket p) {
- if (true)
- return null;
- String type = null;
- String mess = null;
- Record txn = null;
- switch (p.getType()) {
- case Leader.ACK:
- type = "ACK";
- break;
- case Leader.COMMIT:
- type = "COMMIT";
- break;
- case Leader.LASTZXID:
- type = "LASTZXID";
- break;
- case Leader.NEWLEADER:
- type = "NEWLEADER";
- break;
- case Leader.PING:
- type = "PING";
- break;
- case Leader.PROPOSAL:
- type = "PROPOSAL";
- BinaryInputArchive ia = BinaryInputArchive
- .getArchive(new ByteArrayInputStream(p.getData()));
- TxnHeader hdr = new TxnHeader();
- try {
- txn = ZooKeeperServer.deserializeTxn(ia, hdr);
- // mess = "transaction: " + txn.toString();
- } catch (IOException e) {
- LOG.warn("Unexpected exception",e);
- }
- break;
- case Leader.REQUEST:
- type = "REQUEST";
- break;
- case Leader.REVALIDATE:
- type = "REVALIDATE";
- ByteArrayInputStream bis = new ByteArrayInputStream(p.getData());
- DataInputStream dis = new DataInputStream(bis);
- try {
- long id = dis.readLong();
- mess = " sessionid = " + id;
- } catch (IOException e) {
- LOG.warn("Unexpected exception", e);
- }
-
- break;
- case Leader.UPTODATE:
- type = "UPTODATE";
- break;
- default:
- type = "UNKNOWN" + p.getType();
- }
- String entry = null;
- if (type != null) {
- entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess;
- }
- return entry;
- }
-
- /**
- * This thread will receive packets from the follower and process them and
- * also listen to new connections from new followers.
- */
- public void run() {
- try {
-
- ia = BinaryInputArchive.getArchive(new BufferedInputStream(s
- .getInputStream()));
- bufferedOutput = new BufferedOutputStream(s.getOutputStream());
- oa = BinaryOutputArchive.getArchive(bufferedOutput);
-
- QuorumPacket qp = new QuorumPacket();
- ia.readRecord(qp, "packet");
- if (qp.getType() != Leader.LASTZXID) {
- LOG.error("First packet " + qp.toString()
- + " is not LASTZXID!");
- return;
- }
- long peerLastZxid = qp.getZxid();
- int packetToSend = Leader.SNAP;
- boolean logTxns = true;
-
- long zxidToSend = 0;
- // we are sending the diff
- synchronized(leader.zk.committedLog) {
- if (leader.zk.committedLog.size() != 0) {
- if ((leader.zk.maxCommittedLog >= peerLastZxid)
- && (leader.zk.minCommittedLog <= peerLastZxid)) {
- packetToSend = Leader.DIFF;
- zxidToSend = leader.zk.maxCommittedLog;
- for (Proposal propose: leader.zk.committedLog) {
- if (propose.packet.getZxid() > peerLastZxid) {
- queuePacket(propose.packet);
- QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
- null, null);
- queuePacket(qcommit);
-
- }
- }
- }
- }
- else {
- logTxns = false;
- } }
- long leaderLastZxid = leader.startForwarding(this, peerLastZxid);
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- leaderLastZxid, null, null);
- oa.writeRecord(newLeaderQP, "packet");
- bufferedOutput.flush();
- // a special case when both the ids are the same
- if (peerLastZxid == leaderLastZxid) {
- packetToSend = Leader.DIFF;
- zxidToSend = leaderLastZxid;
- }
- //check if we decided to send a diff or we need to send a truncate
- // we avoid using epochs for truncating because epochs make things
- // complicated. Two epochs might have the last 32 bits as same.
- // only if we know that there is a committed zxid in the queue that
- // is less than the one the peer has we send a trunc else to make
- // things simple we just send sanpshot.
- if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {
- // this is the only case that we are sure that
- // we can ask the follower to truncate the log
- packetToSend = Leader.TRUNC;
- zxidToSend = leader.zk.maxCommittedLog;
-
- }
- oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
- bufferedOutput.flush();
- // only if we are not truncating or fast sycning
- if (packetToSend == Leader.SNAP) {
- LOG.warn("Sending snapshot last zxid of peer is "
- + Long.toHexString(peerLastZxid) + " " + " zxid of leader is "
- + Long.toHexString(leaderLastZxid));
- // Dump data to follower
- leader.zk.snapshot(oa);
- oa.writeString("BenWasHere", "signature");
- }
- bufferedOutput.flush();
- //
- // Mutation packets will be queued during the serialize,
- // so we need to mark when the follower can actually start
- // using the data
- //
- queuedPackets
- .add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
-
- // Start sending packets
- new Thread() {
- public void run() {
- Thread.currentThread().setName(
- "Sender-" + s.getRemoteSocketAddress());
- try {
- sendPackets();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted",e);
- }
- }
- }.start();
-
- while (true) {
- qp = new QuorumPacket();
- ia.readRecord(qp, "packet");
-
- long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
- if (qp.getType() == Leader.PING) {
- traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
- }
- ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
- tickOfLastAck = leader.self.tick;
-
-
- ByteBuffer bb;
- long sessionId;
- int cxid;
- int type;
-
- switch (qp.getType()) {
- case Leader.ACK:
- leader.processAck(qp.getZxid(), s.getLocalSocketAddress());
- break;
- case Leader.PING:
- // Process the touches
- ByteArrayInputStream bis = new ByteArrayInputStream(qp
- .getData());
- DataInputStream dis = new DataInputStream(bis);
- while (dis.available() > 0) {
- long sess = dis.readLong();
- int to = dis.readInt();
- leader.zk.touch(sess, to);
- }
- break;
- case Leader.REVALIDATE:
- bis = new ByteArrayInputStream(qp.getData());
- dis = new DataInputStream(bis);
- long id = dis.readLong();
- int to = dis.readInt();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- dos.writeLong(id);
- boolean valid = leader.zk.touch(id, to);
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "Session " + Long.toHexString(id)
- + " is valid: "+ valid);
- dos.writeBoolean(valid);
- qp.setData(bos.toByteArray());
- queuedPackets.add(qp);
- break;
- case Leader.REQUEST:
- bb = ByteBuffer.wrap(qp.getData());
- sessionId = bb.getLong();
- cxid = bb.getInt();
- type = bb.getInt();
- bb = bb.slice();
- if(type == OpCode.sync){
- leader.setSyncHandler(this, sessionId);
- }
- leader.zk.submitRequest(null, sessionId, type, cxid, bb,
- qp.getAuthinfo());
- break;
- default:
- }
- }
- } catch (IOException e) {
- if (s != null && !s.isClosed()) {
- LOG.error("FIXMSG",e);
- }
- } catch (InterruptedException e) {
- LOG.error("FIXMSG",e);
- } finally {
- LOG.warn("******* GOODBYE " + s.getRemoteSocketAddress()
- + " ********");
- // Send the packet of death
- try {
- queuedPackets.put(proposalOfDeath);
- } catch (InterruptedException e) {
- LOG.error("FIXMSG",e);
- }
- shutdown();
- }
- }
-
- public void shutdown() {
- try {
- if (s != null && !s.isClosed()) {
- s.close();
- }
- } catch (IOException e) {
- LOG.error("FIXMSG",e);
- }
- leader.removeFollowerHandler(this);
- }
-
- public long tickOfLastAck() {
- return tickOfLastAck;
- }
-
- /**
- * ping calls from the leader to the followers
- */
- public void ping() {
- QuorumPacket ping = new QuorumPacket(Leader.PING, leader.lastProposed,
- null, null);
- queuePacket(ping);
- }
-
- void queuePacket(QuorumPacket p) {
- queuedPackets.add(p);
- }
-
- public boolean synced() {
- return isAlive()
- && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.txn.TxnHeader;
+
+/**
+ * There will be an instance of this class created by the Leader for each
+ * follower.All communication for a given Follower will be handled by this
+ * class.
+ */
+public class FollowerHandler extends Thread {
+ private static final Logger LOG = Logger.getLogger(FollowerHandler.class);
+
+ public Socket s;
+
+ Leader leader;
+
+ long tickOfLastAck;
+
+ /**
+ * The packets to be sent to the follower
+ */
+ LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
+
+ private BinaryInputArchive ia;
+
+ private BinaryOutputArchive oa;
+
+ private BufferedOutputStream bufferedOutput;
+
+ FollowerHandler(Socket s, Leader leader) throws IOException {
+ super("FollowerHandler-" + s.getRemoteSocketAddress());
+ this.s = s;
+ this.leader = leader;
+ leader.addFollowerHandler(this);
+ start();
+ }
+
+ /**
+ * If this packet is queued, the sender thread will exit
+ */
+ QuorumPacket proposalOfDeath = new QuorumPacket();
+
+ /**
+ * This method will use the thread to send packets added to the
+ * queuedPackets list
+ *
+ * @throws InterruptedException
+ */
+ private void sendPackets() throws InterruptedException {
+ long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+ while (true) {
+ QuorumPacket p;
+ p = queuedPackets.take();
+
+ if (p == proposalOfDeath) {
+ // Packet of death!
+ break;
+ }
+ if (p.getType() == Leader.PING) {
+ traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+ }
+ ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
+ try {
+ oa.writeRecord(p, "packet");
+ bufferedOutput.flush();
+ } catch (IOException e) {
+ if (!s.isClosed()) {
+ LOG.warn("Unexpected exception",e);
+ }
+ break;
+ }
+ }
+ }
+
+ static public String packetToString(QuorumPacket p) {
+ if (true)
+ return null;
+ String type = null;
+ String mess = null;
+ Record txn = null;
+ switch (p.getType()) {
+ case Leader.ACK:
+ type = "ACK";
+ break;
+ case Leader.COMMIT:
+ type = "COMMIT";
+ break;
+ case Leader.LASTZXID:
+ type = "LASTZXID";
+ break;
+ case Leader.NEWLEADER:
+ type = "NEWLEADER";
+ break;
+ case Leader.PING:
+ type = "PING";
+ break;
+ case Leader.PROPOSAL:
+ type = "PROPOSAL";
+ BinaryInputArchive ia = BinaryInputArchive
+ .getArchive(new ByteArrayInputStream(p.getData()));
+ TxnHeader hdr = new TxnHeader();
+ try {
+ txn = ZooKeeperServer.deserializeTxn(ia, hdr);
+ // mess = "transaction: " + txn.toString();
+ } catch (IOException e) {
+ LOG.warn("Unexpected exception",e);
+ }
+ break;
+ case Leader.REQUEST:
+ type = "REQUEST";
+ break;
+ case Leader.REVALIDATE:
+ type = "REVALIDATE";
+ ByteArrayInputStream bis = new ByteArrayInputStream(p.getData());
+ DataInputStream dis = new DataInputStream(bis);
+ try {
+ long id = dis.readLong();
+ mess = " sessionid = " + id;
+ } catch (IOException e) {
+ LOG.warn("Unexpected exception", e);
+ }
+
+ break;
+ case Leader.UPTODATE:
+ type = "UPTODATE";
+ break;
+ default:
+ type = "UNKNOWN" + p.getType();
+ }
+ String entry = null;
+ if (type != null) {
+ entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess;
+ }
+ return entry;
+ }
+
+ /**
+ * This thread will receive packets from the follower and process them and
+ * also listen to new connections from new followers.
+ */
+ public void run() {
+ try {
+
+ ia = BinaryInputArchive.getArchive(new BufferedInputStream(s
+ .getInputStream()));
+ bufferedOutput = new BufferedOutputStream(s.getOutputStream());
+ oa = BinaryOutputArchive.getArchive(bufferedOutput);
+
+ QuorumPacket qp = new QuorumPacket();
+ ia.readRecord(qp, "packet");
+ if (qp.getType() != Leader.LASTZXID) {
+ LOG.error("First packet " + qp.toString()
+ + " is not LASTZXID!");
+ return;
+ }
+ long peerLastZxid = qp.getZxid();
+ int packetToSend = Leader.SNAP;
+ boolean logTxns = true;
+
+ long zxidToSend = 0;
+ // we are sending the diff
+ synchronized(leader.zk.committedLog) {
+ if (leader.zk.committedLog.size() != 0) {
+ if ((leader.zk.maxCommittedLog >= peerLastZxid)
+ && (leader.zk.minCommittedLog <= peerLastZxid)) {
+ packetToSend = Leader.DIFF;
+ zxidToSend = leader.zk.maxCommittedLog;
+ for (Proposal propose: leader.zk.committedLog) {
+ if (propose.packet.getZxid() > peerLastZxid) {
+ queuePacket(propose.packet);
+ QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
+ null, null);
+ queuePacket(qcommit);
+
+ }
+ }
+ }
+ }
+ else {
+ logTxns = false;
+ } }
+ long leaderLastZxid = leader.startForwarding(this, peerLastZxid);
+ QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+ leaderLastZxid, null, null);
+ oa.writeRecord(newLeaderQP, "packet");
+ bufferedOutput.flush();
+ // a special case when both the ids are the same
+ if (peerLastZxid == leaderLastZxid) {
+ packetToSend = Leader.DIFF;
+ zxidToSend = leaderLastZxid;
+ }
+ //check if we decided to send a diff or we need to send a truncate
+ // we avoid using epochs for truncating because epochs make things
+ // complicated. Two epochs might have the last 32 bits as same.
+ // only if we know that there is a committed zxid in the queue that
+ // is less than the one the peer has we send a trunc else to make
+ // things simple we just send sanpshot.
+ if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {
+ // this is the only case that we are sure that
+ // we can ask the follower to truncate the log
+ packetToSend = Leader.TRUNC;
+ zxidToSend = leader.zk.maxCommittedLog;
+
+ }
+ oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
+ bufferedOutput.flush();
+ // only if we are not truncating or fast sycning
+ if (packetToSend == Leader.SNAP) {
+ LOG.warn("Sending snapshot last zxid of peer is "
+ + Long.toHexString(peerLastZxid) + " " + " zxid of leader is "
+ + Long.toHexString(leaderLastZxid));
+ // Dump data to follower
+ leader.zk.snapshot(oa);
+ oa.writeString("BenWasHere", "signature");
+ }
+ bufferedOutput.flush();
+ //
+ // Mutation packets will be queued during the serialize,
+ // so we need to mark when the follower can actually start
+ // using the data
+ //
+ queuedPackets
+ .add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
+
+ // Start sending packets
+ new Thread() {
+ public void run() {
+ Thread.currentThread().setName(
+ "Sender-" + s.getRemoteSocketAddress());
+ try {
+ sendPackets();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted",e);
+ }
+ }
+ }.start();
+
+ while (true) {
+ qp = new QuorumPacket();
+ ia.readRecord(qp, "packet");
+
+ long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+ if (qp.getType() == Leader.PING) {
+ traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+ }
+ ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
+ tickOfLastAck = leader.self.tick;
+
+
+ ByteBuffer bb;
+ long sessionId;
+ int cxid;
+ int type;
+
+ switch (qp.getType()) {
+ case Leader.ACK:
+ leader.processAck(qp.getZxid(), s.getLocalSocketAddress());
+ break;
+ case Leader.PING:
+ // Process the touches
+ ByteArrayInputStream bis = new ByteArrayInputStream(qp
+ .getData());
+ DataInputStream dis = new DataInputStream(bis);
+ while (dis.available() > 0) {
+ long sess = dis.readLong();
+ int to = dis.readInt();
+ leader.zk.touch(sess, to);
+ }
+ break;
+ case Leader.REVALIDATE:
+ bis = new ByteArrayInputStream(qp.getData());
+ dis = new DataInputStream(bis);
+ long id = dis.readLong();
+ int to = dis.readInt();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeLong(id);
+ boolean valid = leader.zk.touch(id, to);
+ ZooTrace.logTraceMessage(LOG,
+ ZooTrace.SESSION_TRACE_MASK,
+ "Session " + Long.toHexString(id)
+ + " is valid: "+ valid);
+ dos.writeBoolean(valid);
+ qp.setData(bos.toByteArray());
+ queuedPackets.add(qp);
+ break;
+ case Leader.REQUEST:
+ bb = ByteBuffer.wrap(qp.getData());
+ sessionId = bb.getLong();
+ cxid = bb.getInt();
+ type = bb.getInt();
+ bb = bb.slice();
+ if(type == OpCode.sync){
+ leader.setSyncHandler(this, sessionId);
+ }
+ leader.zk.submitRequest(null, sessionId, type, cxid, bb,
+ qp.getAuthinfo());
+ break;
+ default:
+ }
+ }
+ } catch (IOException e) {
+ if (s != null && !s.isClosed()) {
+ LOG.error("FIXMSG",e);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("FIXMSG",e);
+ } finally {
+ LOG.warn("******* GOODBYE " + s.getRemoteSocketAddress()
+ + " ********");
+ // Send the packet of death
+ try {
+ queuedPackets.put(proposalOfDeath);
+ } catch (InterruptedException e) {
+ LOG.error("FIXMSG",e);
+ }
+ shutdown();
+ }
+ }
+
+ public void shutdown() {
+ try {
+ if (s != null && !s.isClosed()) {
+ s.close();
+ }
+ } catch (IOException e) {
+ LOG.error("FIXMSG",e);
+ }
+ leader.removeFollowerHandler(this);
+ }
+
+ public long tickOfLastAck() {
+ return tickOfLastAck;
+ }
+
+ /**
+ * ping calls from the leader to the followers
+ */
+ public void ping() {
+ QuorumPacket ping = new QuorumPacket(Leader.PING, leader.lastProposed,
+ null, null);
+ queuePacket(ping);
+ }
+
+ void queuePacket(QuorumPacket p) {
+ queuedPackets.add(p);
+ }
+
+ public boolean synced() {
+ return isAlive()
+ && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
+ }
+}
|