http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3ce1b18b/src/java/main/org/apache/zookeeper/server/quorum/AckRequestProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/AckRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/quorum/AckRequestProcessor.java
deleted file mode 100644
index 8e7e472..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/AckRequestProcessor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-
-
-/**
- * This is a very simple RequestProcessor that simply forwards a request from a
- * previous stage to the leader as an ACK.
- */
-class AckRequestProcessor implements RequestProcessor {
- private static final Logger LOG = LoggerFactory.getLogger(AckRequestProcessor.class);
- Leader leader;
-
- AckRequestProcessor(Leader leader) {
- this.leader = leader;
- }
-
- /**
- * Forward the request as an ACK to the leader
- */
- public void processRequest(Request request) {
- QuorumPeer self = leader.self;
- if(self != null)
- leader.processAck(self.getId(), request.zxid, null);
- else
- LOG.error("Null QuorumPeer");
- }
-
- public void shutdown() {
- // XXX No need to do anything
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3ce1b18b/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
deleted file mode 100644
index 0f8c9c1..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java
+++ /dev/null
@@ -1,983 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-
-import java.util.concurrent.TimeUnit;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.zookeeper.common.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.server.ZooKeeperThread;
-import org.apache.zookeeper.server.quorum.Election;
-import org.apache.zookeeper.server.quorum.Vote;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-
-/**
- * @deprecated This class has been deprecated as of release 3.4.0.
- */
-@Deprecated
-public class AuthFastLeaderElection implements Election {
- private static final Logger LOG = LoggerFactory.getLogger(AuthFastLeaderElection.class);
-
- /* Sequence numbers for messages */
- static int sequencer = 0;
- static int maxTag = 0;
-
- /*
- * Determine how much time a process has to wait once it believes that it
- * has reached the end of leader election.
- */
- static int finalizeWait = 100;
-
- /*
- * Challenge counter to avoid replay attacks
- */
-
- static int challengeCounter = 0;
-
- /*
- * Flag to determine whether to authenticate or not
- */
-
- private boolean authEnabled = false;
-
- static public class Notification {
- /*
- * Proposed leader
- */
- long leader;
-
- /*
- * zxid of the proposed leader
- */
- long zxid;
-
- /*
- * Epoch
- */
- long epoch;
-
- /*
- * current state of sender
- */
- QuorumPeer.ServerState state;
-
- /*
- * Address of the sender
- */
- InetSocketAddress addr;
- }
-
- /*
- * Messages to send, both Notifications and Acks
- */
- static public class ToSend {
- static enum mType {
- crequest, challenge, notification, ack
- }
-
- ToSend(mType type, long tag, long leader, long zxid, long epoch,
- ServerState state, InetSocketAddress addr) {
-
- switch (type) {
- case crequest:
- this.type = 0;
- this.tag = tag;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = state;
- this.addr = addr;
-
- break;
- case challenge:
- this.type = 1;
- this.tag = tag;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = state;
- this.addr = addr;
-
- break;
- case notification:
- this.type = 2;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = QuorumPeer.ServerState.LOOKING;
- this.tag = tag;
- this.addr = addr;
-
- break;
- case ack:
- this.type = 3;
- this.tag = tag;
- this.leader = leader;
- this.zxid = zxid;
- this.epoch = epoch;
- this.state = state;
- this.addr = addr;
-
- break;
- default:
- break;
- }
- }
-
- /*
- * Message type: 0 notification, 1 acknowledgement
- */
- int type;
-
- /*
- * Proposed leader in the case of notification
- */
- long leader;
-
- /*
- * id contains the tag for acks, and zxid for notifications
- */
- long zxid;
-
- /*
- * Epoch
- */
- long epoch;
-
- /*
- * Current state;
- */
- QuorumPeer.ServerState state;
-
- /*
- * Message tag
- */
- long tag;
-
- InetSocketAddress addr;
- }
-
- LinkedBlockingQueue<ToSend> sendqueue;
-
- LinkedBlockingQueue<Notification> recvqueue;
-
- private class Messenger {
-
- final DatagramSocket mySocket;
- long lastProposedLeader;
- long lastProposedZxid;
- long lastEpoch;
- final Set<Long> ackset;
- final ConcurrentHashMap<Long, Long> challengeMap;
- final ConcurrentHashMap<Long, Semaphore> challengeMutex;
- final ConcurrentHashMap<Long, Semaphore> ackMutex;
- final ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>> addrChallengeMap;
-
- class WorkerReceiver extends ZooKeeperThread {
-
- DatagramSocket mySocket;
- Messenger myMsg;
-
- WorkerReceiver(DatagramSocket s, Messenger msg) {
- super("WorkerReceiver-" + s.getRemoteSocketAddress());
- mySocket = s;
- myMsg = msg;
- }
-
- boolean saveChallenge(long tag, long challenge) {
- Semaphore s = challengeMutex.get(tag);
- if (s != null) {
- synchronized (Messenger.this) {
- challengeMap.put(tag, challenge);
- challengeMutex.remove(tag);
- }
-
-
- s.release();
- } else {
- LOG.error("No challenge mutex object");
- }
-
-
- return true;
- }
-
- public void run() {
- byte responseBytes[] = new byte[48];
- ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
- DatagramPacket responsePacket = new DatagramPacket(
- responseBytes, responseBytes.length);
- while (true) {
- // Sleeps on receive
- try {
- responseBuffer.clear();
- mySocket.receive(responsePacket);
- } catch (IOException e) {
- LOG.warn("Ignoring exception receiving", e);
- }
- // Receive new message
- if (responsePacket.getLength() != responseBytes.length) {
- LOG.warn("Got a short response: "
- + responsePacket.getLength() + " "
- + responsePacket.toString());
- continue;
- }
- responseBuffer.clear();
- int type = responseBuffer.getInt();
- if ((type > 3) || (type < 0)) {
- LOG.warn("Got bad Msg type: " + type);
- continue;
- }
- long tag = responseBuffer.getLong();
-
- QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
- switch (responseBuffer.getInt()) {
- case 0:
- ackstate = QuorumPeer.ServerState.LOOKING;
- break;
- case 1:
- ackstate = QuorumPeer.ServerState.LEADING;
- break;
- case 2:
- ackstate = QuorumPeer.ServerState.FOLLOWING;
- break;
- default:
- LOG.warn("unknown type " + responseBuffer.getInt());
- break;
- }
-
- Vote current = self.getCurrentVote();
-
- switch (type) {
- case 0:
- // Receive challenge request
- ToSend c = new ToSend(ToSend.mType.challenge, tag,
- current.getId(), current.getZxid(),
- logicalclock.get(), self.getPeerState(),
- (InetSocketAddress) responsePacket
- .getSocketAddress());
- sendqueue.offer(c);
- break;
- case 1:
- // Receive challenge and store somewhere else
- long challenge = responseBuffer.getLong();
- saveChallenge(tag, challenge);
-
- break;
- case 2:
- Notification n = new Notification();
- n.leader = responseBuffer.getLong();
- n.zxid = responseBuffer.getLong();
- n.epoch = responseBuffer.getLong();
- n.state = ackstate;
- n.addr = (InetSocketAddress) responsePacket
- .getSocketAddress();
-
- if ((myMsg.lastEpoch <= n.epoch)
- && ((n.zxid > myMsg.lastProposedZxid)
- || ((n.zxid == myMsg.lastProposedZxid)
- && (n.leader > myMsg.lastProposedLeader)))) {
- myMsg.lastProposedZxid = n.zxid;
- myMsg.lastProposedLeader = n.leader;
- myMsg.lastEpoch = n.epoch;
- }
-
- long recChallenge;
- InetSocketAddress addr = (InetSocketAddress) responsePacket
- .getSocketAddress();
- if (authEnabled) {
- ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(addr);
- if(tmpMap != null){
- if (tmpMap.get(tag) != null) {
- recChallenge = responseBuffer.getLong();
-
- if (tmpMap.get(tag) == recChallenge) {
- recvqueue.offer(n);
-
- ToSend a = new ToSend(ToSend.mType.ack,
- tag, current.getId(),
- current.getZxid(),
- logicalclock.get(), self.getPeerState(),
- addr);
-
- sendqueue.offer(a);
- } else {
- LOG.warn("Incorrect challenge: "
- + recChallenge + ", "
- + addrChallengeMap.toString());
- }
- } else {
- LOG.warn("No challenge for host: " + addr
- + " " + tag);
- }
- }
- } else {
- recvqueue.offer(n);
-
- ToSend a = new ToSend(ToSend.mType.ack, tag,
- current.getId(), current.getZxid(),
- logicalclock.get(), self.getPeerState(),
- (InetSocketAddress) responsePacket
- .getSocketAddress());
-
- sendqueue.offer(a);
- }
- break;
-
- // Upon reception of an ack message, remove it from the
- // queue
- case 3:
- Semaphore s = ackMutex.get(tag);
-
- if(s != null)
- s.release();
- else LOG.error("Empty ack semaphore");
-
- ackset.add(tag);
-
- if (authEnabled) {
- ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(responsePacket
- .getSocketAddress());
- if(tmpMap != null) {
- tmpMap.remove(tag);
- } else {
- LOG.warn("No such address in the ensemble configuration " + responsePacket
- .getSocketAddress());
- }
- }
-
- if (ackstate != QuorumPeer.ServerState.LOOKING) {
- Notification outofsync = new Notification();
- outofsync.leader = responseBuffer.getLong();
- outofsync.zxid = responseBuffer.getLong();
- outofsync.epoch = responseBuffer.getLong();
- outofsync.state = ackstate;
- outofsync.addr = (InetSocketAddress) responsePacket
- .getSocketAddress();
-
- recvqueue.offer(outofsync);
- }
-
- break;
- // Default case
- default:
- LOG.warn("Received message of incorrect type " + type);
- break;
- }
- }
- }
- }
-
- class WorkerSender extends ZooKeeperThread {
-
- Random rand;
- int maxAttempts;
- int ackWait = finalizeWait;
-
- /*
- * Receives a socket and max number of attempts as input
- */
-
- WorkerSender(int attempts) {
- super("WorkerSender");
- maxAttempts = attempts;
- rand = new Random(java.lang.Thread.currentThread().getId()
- + Time.currentElapsedTime());
- }
-
- long genChallenge() {
- byte buf[] = new byte[8];
-
- buf[0] = (byte) ((challengeCounter & 0xff000000) >>> 24);
- buf[1] = (byte) ((challengeCounter & 0x00ff0000) >>> 16);
- buf[2] = (byte) ((challengeCounter & 0x0000ff00) >>> 8);
- buf[3] = (byte) ((challengeCounter & 0x000000ff));
-
- challengeCounter++;
- int secret = rand.nextInt(java.lang.Integer.MAX_VALUE);
-
- buf[4] = (byte) ((secret & 0xff000000) >>> 24);
- buf[5] = (byte) ((secret & 0x00ff0000) >>> 16);
- buf[6] = (byte) ((secret & 0x0000ff00) >>> 8);
- buf[7] = (byte) ((secret & 0x000000ff));
-
- return (((long)(buf[0] & 0xFF)) << 56)
- + (((long)(buf[1] & 0xFF)) << 48)
- + (((long)(buf[2] & 0xFF)) << 40)
- + (((long)(buf[3] & 0xFF)) << 32)
- + (((long)(buf[4] & 0xFF)) << 24)
- + (((long)(buf[5] & 0xFF)) << 16)
- + (((long)(buf[6] & 0xFF)) << 8)
- + ((long)(buf[7] & 0xFF));
- }
-
- public void run() {
- while (true) {
- try {
- ToSend m = sendqueue.take();
- process(m);
- } catch (InterruptedException e) {
- break;
- }
-
- }
- }
-
- private void process(ToSend m) {
- int attempts = 0;
- byte zeroes[];
- byte requestBytes[] = new byte[48];
- DatagramPacket requestPacket = new DatagramPacket(requestBytes,
- requestBytes.length);
- ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
- switch (m.type) {
- case 0:
- /*
- * Building challenge request packet to send
- */
- requestBuffer.clear();
- requestBuffer.putInt(ToSend.mType.crequest.ordinal());
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- zeroes = new byte[32];
- requestBuffer.put(zeroes);
-
- requestPacket.setLength(48);
- try {
- requestPacket.setSocketAddress(m.addr);
- } catch (IllegalArgumentException e) {
- // Sun doesn't include the address that causes this
- // exception to be thrown, so we wrap the exception
- // in order to capture this critical detail.
- throw new IllegalArgumentException(
- "Unable to set socket address on packet, msg:"
- + e.getMessage() + " with addr:" + m.addr,
- e);
- }
-
- try {
- if (challengeMap.get(m.tag) == null) {
- mySocket.send(requestPacket);
- }
- } catch (IOException e) {
- LOG.warn("Exception while sending challenge: ", e);
- }
-
- break;
- case 1:
- /*
- * Building challenge packet to send
- */
-
- long newChallenge;
- ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(m.addr);
- if(tmpMap != null){
- Long tmpLong = tmpMap.get(m.tag);
- if (tmpLong != null) {
- newChallenge = tmpLong;
- } else {
- newChallenge = genChallenge();
- }
-
- tmpMap.put(m.tag, newChallenge);
-
- requestBuffer.clear();
- requestBuffer.putInt(ToSend.mType.challenge.ordinal());
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- requestBuffer.putLong(newChallenge);
- zeroes = new byte[24];
- requestBuffer.put(zeroes);
-
- requestPacket.setLength(48);
- try {
- requestPacket.setSocketAddress(m.addr);
- } catch (IllegalArgumentException e) {
- // Sun doesn't include the address that causes this
- // exception to be thrown, so we wrap the exception
- // in order to capture this critical detail.
- throw new IllegalArgumentException(
- "Unable to set socket address on packet, msg:"
- + e.getMessage() + " with addr:" + m.addr,
- e);
- }
-
-
- try {
- mySocket.send(requestPacket);
- } catch (IOException e) {
- LOG.warn("Exception while sending challenge: ", e);
- }
- } else {
- LOG.error("Address is not in the configuration: " + m.addr);
- }
-
- break;
- case 2:
-
- /*
- * Building notification packet to send
- */
-
- requestBuffer.clear();
- requestBuffer.putInt(m.type);
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- requestBuffer.putLong(m.leader);
- requestBuffer.putLong(m.zxid);
- requestBuffer.putLong(m.epoch);
- zeroes = new byte[8];
- requestBuffer.put(zeroes);
-
- requestPacket.setLength(48);
- try {
- requestPacket.setSocketAddress(m.addr);
- } catch (IllegalArgumentException e) {
- // Sun doesn't include the address that causes this
- // exception to be thrown, so we wrap the exception
- // in order to capture this critical detail.
- throw new IllegalArgumentException(
- "Unable to set socket address on packet, msg:"
- + e.getMessage() + " with addr:" + m.addr,
- e);
- }
-
-
- boolean myChallenge = false;
- boolean myAck = false;
-
- while (attempts < maxAttempts) {
- try {
- /*
- * Try to obtain a challenge only if does not have
- * one yet
- */
-
- if (!myChallenge && authEnabled) {
- ToSend crequest = new ToSend(
- ToSend.mType.crequest, m.tag, m.leader,
- m.zxid, m.epoch,
- QuorumPeer.ServerState.LOOKING, m.addr);
- sendqueue.offer(crequest);
-
- try {
- double timeout = ackWait
- * java.lang.Math.pow(2, attempts);
-
- Semaphore s = new Semaphore(0);
- synchronized(Messenger.this) {
- challengeMutex.put(m.tag, s);
- s.tryAcquire((long) timeout, TimeUnit.MILLISECONDS);
- myChallenge = challengeMap
- .containsKey(m.tag);
- }
- } catch (InterruptedException e) {
- LOG.warn("Challenge request exception: ", e);
- }
- }
-
- /*
- * If don't have challenge yet, skip sending
- * notification
- */
-
- if (authEnabled && !myChallenge) {
- attempts++;
- continue;
- }
-
- if (authEnabled) {
- requestBuffer.position(40);
- Long tmpLong = challengeMap.get(m.tag);
- if(tmpLong != null){
- requestBuffer.putLong(tmpLong);
- } else {
- LOG.warn("No challenge with tag: " + m.tag);
- }
- }
- mySocket.send(requestPacket);
- try {
- Semaphore s = new Semaphore(0);
- double timeout = ackWait
- * java.lang.Math.pow(10, attempts);
- ackMutex.put(m.tag, s);
- s.tryAcquire((int) timeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.warn("Ack exception: ", e);
- }
-
- if(ackset.remove(m.tag)){
- myAck = true;
- }
-
- } catch (IOException e) {
- LOG.warn("Sending exception: ", e);
- /*
- * Do nothing, just try again
- */
- }
- if (myAck) {
- /*
- * Received ack successfully, so return
- */
- challengeMap.remove(m.tag);
-
- return;
- } else
- attempts++;
- }
- /*
- * Return message to queue for another attempt later if
- * epoch hasn't changed.
- */
- if (m.epoch == logicalclock.get()) {
- challengeMap.remove(m.tag);
- sendqueue.offer(m);
- }
- break;
- case 3:
-
- requestBuffer.clear();
- requestBuffer.putInt(m.type);
- requestBuffer.putLong(m.tag);
- requestBuffer.putInt(m.state.ordinal());
- requestBuffer.putLong(m.leader);
- requestBuffer.putLong(m.zxid);
- requestBuffer.putLong(m.epoch);
-
- requestPacket.setLength(48);
- try {
- requestPacket.setSocketAddress(m.addr);
- } catch (IllegalArgumentException e) {
- // Sun doesn't include the address that causes this
- // exception to be thrown, so we wrap the exception
- // in order to capture this critical detail.
- throw new IllegalArgumentException(
- "Unable to set socket address on packet, msg:"
- + e.getMessage() + " with addr:" + m.addr,
- e);
- }
-
-
- try {
- mySocket.send(requestPacket);
- } catch (IOException e) {
- LOG.warn("Exception while sending ack: ", e);
- }
- break;
- default:
- LOG.warn("unknown type " + m.type);
- break;
- }
- }
- }
-
- Messenger(int threads, DatagramSocket s) {
- mySocket = s;
- ackset = Collections.<Long>newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
- challengeMap = new ConcurrentHashMap<Long, Long>();
- challengeMutex = new ConcurrentHashMap<Long, Semaphore>();
- ackMutex = new ConcurrentHashMap<Long, Semaphore>();
- addrChallengeMap = new ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>>();
- lastProposedLeader = 0;
- lastProposedZxid = 0;
- lastEpoch = 0;
-
- for (int i = 0; i < threads; ++i) {
- Thread t = new Thread(new WorkerSender(3),
- "WorkerSender Thread: " + (i + 1));
- t.setDaemon(true);
- t.start();
- }
-
- for (QuorumServer server : self.getVotingView().values()) {
- InetSocketAddress saddr = new InetSocketAddress(server.addr
- .getAddress(), port);
- addrChallengeMap.put(saddr, new ConcurrentHashMap<Long, Long>());
- }
-
- Thread t = new Thread(new WorkerReceiver(s, this),
- "WorkerReceiver Thread");
- t.start();
- }
-
- }
-
- QuorumPeer self;
- int port;
- AtomicLong logicalclock = new AtomicLong(); /* Election instance */
- DatagramSocket mySocket;
- long proposedLeader;
- long proposedZxid;
-
- public AuthFastLeaderElection(QuorumPeer self,
- boolean auth) {
- this.authEnabled = auth;
- starter(self);
- }
-
- public AuthFastLeaderElection(QuorumPeer self) {
- starter(self);
- }
-
- private void starter(QuorumPeer self) {
- this.self = self;
- port = self.getVotingView().get(self.getId()).electionAddr.getPort();
- proposedLeader = -1;
- proposedZxid = -1;
-
- try {
- mySocket = new DatagramSocket(port);
- // mySocket.setSoTimeout(20000);
- } catch (SocketException e1) {
- e1.printStackTrace();
- throw new RuntimeException();
- }
- sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.getVotingView().size());
- recvqueue = new LinkedBlockingQueue<Notification>(2 * self.getVotingView()
- .size());
- new Messenger(self.getVotingView().size() * 2, mySocket);
- }
-
- private void leaveInstance() {
- logicalclock.incrementAndGet();
- }
-
- private void sendNotifications() {
- for (QuorumServer server : self.getView().values()) {
-
- ToSend notmsg = new ToSend(ToSend.mType.notification,
- AuthFastLeaderElection.sequencer++, proposedLeader,
- proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING,
- self.getView().get(server.id).electionAddr);
-
- sendqueue.offer(notmsg);
- }
- }
-
- private boolean totalOrderPredicate(long id, long zxid) {
- if ((zxid > proposedZxid)
- || ((zxid == proposedZxid) && (id > proposedLeader)))
- return true;
- else
- return false;
-
- }
-
- private boolean termPredicate(HashMap<InetSocketAddress, Vote> votes,
- long l, long zxid) {
-
-
- Collection<Vote> votesCast = votes.values();
- int count = 0;
- /*
- * First make the views consistent. Sometimes peers will have different
- * zxids for a server depending on timing.
- */
- for (Vote v : votesCast) {
- if ((v.getId() == l) && (v.getZxid() == zxid))
- count++;
- }
-
- if (count > (self.getVotingView().size() / 2))
- return true;
- else
- return false;
-
- }
-
- /**
- * There is nothing to shutdown in this implementation of
- * leader election, so we simply have an empty method.
- */
- public void shutdown(){}
-
- /**
- * Invoked in QuorumPeer to find or elect a new leader.
- *
- * @throws InterruptedException
- */
- public Vote lookForLeader() throws InterruptedException {
- try {
- self.jmxLeaderElectionBean = new LeaderElectionBean();
- MBeanRegistry.getInstance().register(
- self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- self.jmxLeaderElectionBean = null;
- }
-
- try {
- HashMap<InetSocketAddress, Vote> recvset =
- new HashMap<InetSocketAddress, Vote>();
-
- HashMap<InetSocketAddress, Vote> outofelection =
- new HashMap<InetSocketAddress, Vote>();
-
- logicalclock.incrementAndGet();
-
- proposedLeader = self.getId();
- proposedZxid = self.getLastLoggedZxid();
-
- LOG.info("Election tally");
- sendNotifications();
-
- /*
- * Loop in which we exchange notifications until we find a leader
- */
-
- while (self.getPeerState() == ServerState.LOOKING) {
- /*
- * Remove next notification from queue, times out after 2 times
- * the termination time
- */
- Notification n = recvqueue.poll(2 * finalizeWait,
- TimeUnit.MILLISECONDS);
-
- /*
- * Sends more notifications if haven't received enough.
- * Otherwise processes new notification.
- */
- if (n == null) {
- if (((!outofelection.isEmpty()) || (recvset.size() > 1)))
- sendNotifications();
- } else
- switch (n.state) {
- case LOOKING:
- if (n.epoch > logicalclock.get()) {
- logicalclock.set( n.epoch );
- recvset.clear();
- if (totalOrderPredicate(n.leader, n.zxid)) {
- proposedLeader = n.leader;
- proposedZxid = n.zxid;
- }
- sendNotifications();
- } else if (n.epoch < logicalclock.get()) {
- break;
- } else if (totalOrderPredicate(n.leader, n.zxid)) {
- proposedLeader = n.leader;
- proposedZxid = n.zxid;
-
- sendNotifications();
- }
-
- recvset.put(n.addr, new Vote(n.leader, n.zxid));
-
- // If have received from all nodes, then terminate
- if (self.getVotingView().size() == recvset.size()) {
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
- // if (self.state == ServerState.FOLLOWING) {
- // Thread.sleep(100);
- // }
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
-
- } else if (termPredicate(recvset, proposedLeader,
- proposedZxid)) {
- // Otherwise, wait for a fixed amount of time
- LOG.info("Passed predicate");
- Thread.sleep(finalizeWait);
-
- // Notification probe = recvqueue.peek();
-
- // Verify if there is any change in the proposed leader
- while ((!recvqueue.isEmpty())
- && !totalOrderPredicate(
- recvqueue.peek().leader, recvqueue
- .peek().zxid)) {
- recvqueue.poll();
- }
- if (recvqueue.isEmpty()) {
- // LOG.warn("Proposed leader: " +
- // proposedLeader);
- self.setPeerState(
- (proposedLeader == self.getId()) ?
- ServerState.LEADING :
- ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(proposedLeader, proposedZxid);
- }
- }
- break;
- case LEADING:
- outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
- if (termPredicate(outofelection, n.leader, n.zxid)) {
-
- self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- break;
- case FOLLOWING:
- outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
- if (termPredicate(outofelection, n.leader, n.zxid)) {
-
- self.setPeerState((n.leader == self.getId()) ?
- ServerState.LEADING: ServerState.FOLLOWING);
-
- leaveInstance();
- return new Vote(n.leader, n.zxid);
- }
- break;
- default:
- break;
- }
- }
-
- return null;
- } finally {
- try {
- if(self.jmxLeaderElectionBean != null){
- MBeanRegistry.getInstance().unregister(
- self.jmxLeaderElectionBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- self.jmxLeaderElectionBean = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3ce1b18b/src/java/main/org/apache/zookeeper/server/quorum/BufferStats.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/BufferStats.java b/src/java/main/org/apache/zookeeper/server/quorum/BufferStats.java
deleted file mode 100644
index a76d80f..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/BufferStats.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-/**
- * Provides live statistics about Jute buffer usage in term of proposal and client request size.
- */
-public class BufferStats {
- public static final int INIT_VALUE = -1;
-
- /**
- * Size of the last buffer usage.
- */
- private int lastBufferSize = INIT_VALUE;
-
- /**
- * Size of the smallest buffer usage.
- */
- private int minBufferSize = INIT_VALUE;
-
- /**
- * Size of the largest buffer usage.
- */
- private int maxBufferSize = INIT_VALUE;
-
- /**
- * Size of the last buffer usage.
- */
- public synchronized int getLastBufferSize() {
- return lastBufferSize;
- }
-
- /**
- * Updates statistics by setting the last buffer usage size.
- */
- public synchronized void setLastBufferSize(int value) {
- lastBufferSize = value;
- if (minBufferSize == INIT_VALUE || value < minBufferSize) {
- minBufferSize = value;
- }
- if (value > maxBufferSize) {
- maxBufferSize = value;
- }
- }
-
- /**
- * Size of the smallest buffer usage.
- */
- public synchronized int getMinBufferSize() {
- return minBufferSize;
- }
-
- /**
- * Size of the largest buffer usage.
- */
- public synchronized int getMaxBufferSize() {
- return maxBufferSize;
- }
-
- /**
- * Reset statistics.
- */
- public synchronized void reset() {
- lastBufferSize = INIT_VALUE;
- minBufferSize = INIT_VALUE;
- maxBufferSize = INIT_VALUE;
- }
-
- @Override
- public synchronized String toString() {
- return String.format("%d/%d/%d", lastBufferSize, minBufferSize, maxBufferSize);
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3ce1b18b/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java b/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
deleted file mode 100644
index e87f359..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.WorkerService;
-import org.apache.zookeeper.server.ZooKeeperCriticalThread;
-import org.apache.zookeeper.server.ZooKeeperServerListener;
-
-/**
- * This RequestProcessor matches the incoming committed requests with the
- * locally submitted requests. The trick is that locally submitted requests that
- * change the state of the system will come back as incoming committed requests,
- * so we need to match them up.
- *
- * The CommitProcessor is multi-threaded. Communication between threads is
- * handled via queues, atomics, and wait/notifyAll synchronized on the
- * processor. The CommitProcessor acts as a gateway for allowing requests to
- * continue with the remainder of the processing pipeline. It will allow many
- * read requests but only a single write request to be in flight simultaneously,
- * thus ensuring that write requests are processed in transaction id order.
- *
- * - 1 commit processor main thread, which watches the request queues and
- * assigns requests to worker threads based on their sessionId so that
- * read and write requests for a particular session are always assigned
- * to the same thread (and hence are guaranteed to run in order).
- * - 0-N worker threads, which run the rest of the request processor pipeline
- * on the requests. If configured with 0 worker threads, the primary
- * commit processor thread runs the pipeline directly.
- *
- * Typical (default) thread counts are: on a 32 core machine, 1 commit
- * processor thread and 32 worker threads.
- *
- * Multi-threading constraints:
- * - Each session's requests must be processed in order.
- * - Write requests must be processed in zxid order
- * - Must ensure no race condition between writes in one session that would
- * trigger a watch being set by a read request in another session
- *
- * The current implementation solves the third constraint by simply allowing no
- * read requests to be processed in parallel with write requests.
- */
-public class CommitProcessor extends ZooKeeperCriticalThread implements
- RequestProcessor {
- private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);
-
- /** Default: numCores */
- public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS =
- "zookeeper.commitProcessor.numWorkerThreads";
- /** Default worker pool shutdown timeout in ms: 5000 (5s) */
- public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
- "zookeeper.commitProcessor.shutdownTimeout";
-
- /**
- * Requests that we are holding until the commit comes in.
- */
- protected final LinkedBlockingQueue<Request> queuedRequests =
- new LinkedBlockingQueue<Request>();
-
- /**
- * Requests that have been committed.
- */
- protected final LinkedBlockingQueue<Request> committedRequests =
- new LinkedBlockingQueue<Request>();
-
- /** Request for which we are currently awaiting a commit */
- protected final AtomicReference<Request> nextPending =
- new AtomicReference<Request>();
- /** Request currently being committed (ie, sent off to next processor) */
- private final AtomicReference<Request> currentlyCommitting =
- new AtomicReference<Request>();
-
- /** The number of requests currently being processed */
- protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
-
- RequestProcessor nextProcessor;
-
- protected volatile boolean stopped = true;
- private long workerShutdownTimeoutMS;
- protected WorkerService workerPool;
-
- /**
- * This flag indicates whether we need to wait for a response to come back from the
- * leader or we just let the sync operation flow through like a read. The flag will
- * be false if the CommitProcessor is in a Leader pipeline.
- */
- boolean matchSyncs;
-
- public CommitProcessor(RequestProcessor nextProcessor, String id,
- boolean matchSyncs, ZooKeeperServerListener listener) {
- super("CommitProcessor:" + id, listener);
- this.nextProcessor = nextProcessor;
- this.matchSyncs = matchSyncs;
- }
-
- private boolean isProcessingRequest() {
- return numRequestsProcessing.get() != 0;
- }
-
- private boolean isWaitingForCommit() {
- return nextPending.get() != null;
- }
-
- private boolean isProcessingCommit() {
- return currentlyCommitting.get() != null;
- }
-
- protected boolean needCommit(Request request) {
- switch (request.type) {
- case OpCode.create:
- case OpCode.create2:
- case OpCode.createTTL:
- case OpCode.createContainer:
- case OpCode.delete:
- case OpCode.deleteContainer:
- case OpCode.setData:
- case OpCode.reconfig:
- case OpCode.multi:
- case OpCode.setACL:
- return true;
- case OpCode.sync:
- return matchSyncs;
- case OpCode.createSession:
- case OpCode.closeSession:
- return !request.isLocalSession();
- default:
- return false;
- }
- }
-
- @Override
- public void run() {
- Request request;
- try {
- while (!stopped) {
- synchronized(this) {
- while (
- !stopped &&
- ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) &&
- (committedRequests.isEmpty() || isProcessingRequest()))) {
- wait();
- }
- }
-
- /*
- * Processing queuedRequests: Process the next requests until we
- * find one for which we need to wait for a commit. We cannot
- * process a read request while we are processing write request.
- */
- while (!stopped && !isWaitingForCommit() &&
- !isProcessingCommit() &&
- (request = queuedRequests.poll()) != null) {
- if (needCommit(request)) {
- nextPending.set(request);
- } else {
- sendToNextProcessor(request);
- }
- }
-
- /*
- * Processing committedRequests: check and see if the commit
- * came in for the pending request. We can only commit a
- * request when there is no other request being processed.
- */
- processCommitted();
- }
- } catch (Throwable e) {
- handleException(this.getName(), e);
- }
- LOG.info("CommitProcessor exited loop!");
- }
-
- /*
- * Separated this method from the main run loop
- * for test purposes (ZOOKEEPER-1863)
- */
- protected void processCommitted() {
- Request request;
-
- if (!stopped && !isProcessingRequest() &&
- (committedRequests.peek() != null)) {
-
- /*
- * ZOOKEEPER-1863: continue only if there is no new request
- * waiting in queuedRequests or it is waiting for a
- * commit.
- */
- if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
- return;
- }
- request = committedRequests.poll();
-
- /*
- * We match with nextPending so that we can move to the
- * next request when it is committed. We also want to
- * use nextPending because it has the cnxn member set
- * properly.
- */
- Request pending = nextPending.get();
- if (pending != null &&
- pending.sessionId == request.sessionId &&
- pending.cxid == request.cxid) {
- // we want to send our version of the request.
- // the pointer to the connection in the request
- pending.setHdr(request.getHdr());
- pending.setTxn(request.getTxn());
- pending.zxid = request.zxid;
- // Set currentlyCommitting so we will block until this
- // completes. Cleared by CommitWorkRequest after
- // nextProcessor returns.
- currentlyCommitting.set(pending);
- nextPending.set(null);
- sendToNextProcessor(pending);
- } else {
- // this request came from someone else so just
- // send the commit packet
- currentlyCommitting.set(request);
- sendToNextProcessor(request);
- }
- }
- }
-
- @Override
- public void start() {
- int numCores = Runtime.getRuntime().availableProcessors();
- int numWorkerThreads = Integer.getInteger(
- ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, numCores);
- workerShutdownTimeoutMS = Long.getLong(
- ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);
-
- LOG.info("Configuring CommitProcessor with "
- + (numWorkerThreads > 0 ? numWorkerThreads : "no")
- + " worker threads.");
- if (workerPool == null) {
- workerPool = new WorkerService(
- "CommitProcWork", numWorkerThreads, true);
- }
- stopped = false;
- super.start();
- }
-
- /**
- * Schedule final request processing; if a worker thread pool is not being
- * used, processing is done directly by this thread.
- */
- private void sendToNextProcessor(Request request) {
- numRequestsProcessing.incrementAndGet();
- workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
- }
-
- /**
- * CommitWorkRequest is a small wrapper class to allow
- * downstream processing to be run using the WorkerService
- */
- private class CommitWorkRequest extends WorkerService.WorkRequest {
- private final Request request;
-
- CommitWorkRequest(Request request) {
- this.request = request;
- }
-
- @Override
- public void cleanup() {
- if (!stopped) {
- LOG.error("Exception thrown by downstream processor,"
- + " unable to continue.");
- CommitProcessor.this.halt();
- }
- }
-
- public void doWork() throws RequestProcessorException {
- try {
- nextProcessor.processRequest(request);
- } finally {
- // If this request is the commit request that was blocking
- // the processor, clear.
- currentlyCommitting.compareAndSet(request, null);
-
- /*
- * Decrement outstanding request count. The processor may be
- * blocked at the moment because it is waiting for the pipeline
- * to drain. In that case, wake it up if there are pending
- * requests.
- */
- if (numRequestsProcessing.decrementAndGet() == 0) {
- if (!queuedRequests.isEmpty() ||
- !committedRequests.isEmpty()) {
- wakeup();
- }
- }
- }
- }
- }
-
- synchronized private void wakeup() {
- notifyAll();
- }
-
- public void commit(Request request) {
- if (stopped || request == null) {
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing request:: " + request);
- }
- committedRequests.add(request);
- if (!isProcessingCommit()) {
- wakeup();
- }
- }
-
- public void processRequest(Request request) {
- if (stopped) {
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing request:: " + request);
- }
- queuedRequests.add(request);
- if (!isWaitingForCommit()) {
- wakeup();
- }
- }
-
- private void halt() {
- stopped = true;
- wakeup();
- queuedRequests.clear();
- if (workerPool != null) {
- workerPool.stop();
- }
- }
-
- public void shutdown() {
- LOG.info("Shutting down");
-
- halt();
-
- if (workerPool != null) {
- workerPool.join(workerShutdownTimeoutMS);
- }
-
- if (nextProcessor != null) {
- nextProcessor.shutdown();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/3ce1b18b/src/java/main/org/apache/zookeeper/server/quorum/Election.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Election.java b/src/java/main/org/apache/zookeeper/server/quorum/Election.java
deleted file mode 100644
index 8990638..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/Election.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-
-import org.apache.zookeeper.server.quorum.Vote;
-
-public interface Election {
- public Vote lookForLeader() throws InterruptedException;
- public void shutdown();
-}
|