http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0aa3b3f/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
deleted file mode 100644
index c9c5370..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ZooKeeperCriticalThread;
-import org.apache.zookeeper.server.ZooTrace;
-
-/**
- * This RequestProcessor forwards any requests that modify the state of the
- * system to the Leader.
- */
-public class ObserverRequestProcessor extends ZooKeeperCriticalThread implements
- RequestProcessor {
- private static final Logger LOG = LoggerFactory.getLogger(ObserverRequestProcessor.class);
-
- ObserverZooKeeperServer zks;
-
- RequestProcessor nextProcessor;
-
- // We keep a queue of requests. As requests get submitted they are
- // stored here. The queue is drained in the run() method.
- LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
-
- boolean finished = false;
-
- /**
- * Constructor - takes an ObserverZooKeeperServer to associate with
- * and the next processor to pass requests to after we're finished.
- * @param zks
- * @param nextProcessor
- */
- public ObserverRequestProcessor(ObserverZooKeeperServer zks,
- RequestProcessor nextProcessor) {
- super("ObserverRequestProcessor:" + zks.getServerId(), zks
- .getZooKeeperServerListener());
- this.zks = zks;
- this.nextProcessor = nextProcessor;
- }
-
- @Override
- public void run() {
- try {
- while (!finished) {
- Request request = queuedRequests.take();
- if (LOG.isTraceEnabled()) {
- ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
- 'F', request, "");
- }
- if (request == Request.requestOfDeath) {
- break;
- }
- // We want to queue the request to be processed before we submit
- // the request to the leader so that we are ready to receive
- // the response
- nextProcessor.processRequest(request);
-
- // We now ship the request to the leader. As with all
- // other quorum operations, sync also follows this code
- // path, but different from others, we need to keep track
- // of the sync operations this Observer has pending, so we
- // add it to pendingSyncs.
- switch (request.type) {
- case OpCode.sync:
- zks.pendingSyncs.add(request);
- zks.getObserver().request(request);
- break;
- case OpCode.create:
- case OpCode.delete:
- case OpCode.setData:
- case OpCode.setACL:
- case OpCode.createSession:
- case OpCode.closeSession:
- case OpCode.multi:
- zks.getObserver().request(request);
- break;
- }
- }
- } catch (Exception e) {
- handleException(this.getName(), e);
- }
- LOG.info("ObserverRequestProcessor exited loop!");
- }
-
- /**
- * Simply queue the request, which will be processed in FIFO order.
- */
- public void processRequest(Request request) {
- if (!finished) {
- queuedRequests.add(request);
- }
- }
-
- /**
- * Shutdown the processor.
- */
- public void shutdown() {
- LOG.info("Shutting down");
- finished = true;
- queuedRequests.clear();
- queuedRequests.add(Request.requestOfDeath);
- nextProcessor.shutdown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0aa3b3f/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
deleted file mode 100644
index f024948..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server.quorum;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.server.FinalRequestProcessor;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.SyncRequestProcessor;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-/**
- * A ZooKeeperServer for the Observer node type. Not much is different, but
- * we anticipate specializing the request processors in the future.
- *
- */
-public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
- private static final Logger LOG =
- LoggerFactory.getLogger(ObserverZooKeeperServer.class);
-
- /**
- * Enable since request processor for writing txnlog to disk and
- * take periodic snapshot. Default is ON.
- */
-
- private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
-
- /*
- * Request processors
- */
- private CommitProcessor commitProcessor;
- private SyncRequestProcessor syncProcessor;
-
- /*
- * Pending sync requests
- */
- ConcurrentLinkedQueue<Request> pendingSyncs =
- new ConcurrentLinkedQueue<Request>();
-
- ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
- DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
- super(logFactory, self.tickTime, self.minSessionTimeout,
- self.maxSessionTimeout, treeBuilder, zkDb, self);
- LOG.info("syncEnabled =" + syncRequestProcessorEnabled);
- }
-
- public Observer getObserver() {
- return self.observer;
- }
-
- @Override
- public Learner getLearner() {
- return self.observer;
- }
-
- /**
- * Unlike a Follower, which sees a full request only during the PROPOSAL
- * phase, Observers get all the data required with the INFORM packet.
- * This method commits a request that has been unpacked by from an INFORM
- * received from the Leader.
- *
- * @param request
- */
- public void commitRequest(Request request) {
- if (syncRequestProcessorEnabled) {
- // Write to txnlog and take periodic snapshot
- syncProcessor.processRequest(request);
- }
- commitProcessor.commit(request);
- }
-
- /**
- * Set up the request processors for an Observer:
- * firstProcesor->commitProcessor->finalProcessor
- */
- @Override
- protected void setupRequestProcessors() {
- // We might consider changing the processor behaviour of
- // Observers to, for example, remove the disk sync requirements.
- // Currently, they behave almost exactly the same as followers.
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- commitProcessor = new CommitProcessor(finalProcessor,
- Long.toString(getServerId()), true,
- getZooKeeperServerListener());
- commitProcessor.start();
- firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
- ((ObserverRequestProcessor) firstProcessor).start();
-
- /*
- * Observer should write to disk, so that the it won't request
- * too old txn from the leader which may lead to getting an entire
- * snapshot.
- *
- * However, this may degrade performance as it has to write to disk
- * and do periodic snapshot which may double the memory requirements
- */
- if (syncRequestProcessorEnabled) {
- syncProcessor = new SyncRequestProcessor(this, null);
- syncProcessor.start();
- }
- }
-
- /*
- * Process a sync request
- */
- synchronized public void sync(){
- if(pendingSyncs.size() ==0){
- LOG.warn("Not expecting a sync.");
- return;
- }
-
- Request r = pendingSyncs.remove();
- commitProcessor.commit(r);
- }
-
- @Override
- public String getState() {
- return "observer";
- };
-
- @Override
- public synchronized void shutdown() {
- if (!canShutdown()) {
- LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
- return;
- }
- super.shutdown();
- if (syncRequestProcessorEnabled && syncProcessor != null) {
- syncProcessor.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0aa3b3f/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
deleted file mode 100644
index 641262e..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.SyncRequestProcessor;
-import org.apache.zookeeper.server.quorum.Leader.XidRolloverException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This RequestProcessor simply forwards requests to an AckRequestProcessor and
- * SyncRequestProcessor.
- */
-public class ProposalRequestProcessor implements RequestProcessor {
- private static final Logger LOG =
- LoggerFactory.getLogger(ProposalRequestProcessor.class);
-
- LeaderZooKeeperServer zks;
-
- RequestProcessor nextProcessor;
-
- SyncRequestProcessor syncProcessor;
-
- public ProposalRequestProcessor(LeaderZooKeeperServer zks,
- RequestProcessor nextProcessor) {
- this.zks = zks;
- this.nextProcessor = nextProcessor;
- AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
- syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
- }
-
- /**
- * initialize this processor
- */
- public void initialize() {
- syncProcessor.start();
- }
-
- public void processRequest(Request request) throws RequestProcessorException {
- // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
- // request.type + " id = " + request.sessionId);
- // request.addRQRec(">prop");
-
-
- /* In the following IF-THEN-ELSE block, we process syncs on the leader.
- * If the sync is coming from a follower, then the follower
- * handler adds it to syncHandler. Otherwise, if it is a client of
- * the leader that issued the sync command, then syncHandler won't
- * contain the handler. In this case, we add it to syncHandler, and
- * call processRequest on the next processor.
- */
-
- if(request instanceof LearnerSyncRequest){
- zks.getLeader().processSync((LearnerSyncRequest)request);
- } else {
- nextProcessor.processRequest(request);
- if (request.hdr != null) {
- // We need to sync and get consensus on any transactions
- try {
- zks.getLeader().propose(request);
- } catch (XidRolloverException e) {
- throw new RequestProcessorException(e.getMessage(), e);
- }
- syncProcessor.processRequest(request);
- }
- }
- }
-
- public void shutdown() {
- LOG.info("Shutting down");
- nextProcessor.shutdown();
- syncProcessor.shutdown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0aa3b3f/src/java/main/org/apache/zookeeper/server/quorum/ProposalStats.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/ProposalStats.java b/src/java/main/org/apache/zookeeper/server/quorum/ProposalStats.java
deleted file mode 100644
index 2f3a9c7..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/ProposalStats.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-/**
- * Provides live statistics about a running Leader.
- */
-public class ProposalStats {
- /**
- * Size of the last generated proposal. This should fit into server's jute.maxbuffer setting.
- */
- private int lastProposalSize = -1;
-
- /**
- * Size of the smallest proposal which has been generated since the server was started.
- */
- private int minProposalSize = -1;
-
- /**
- * Size of the largest proposal which has been generated since the server was started.
- */
- private int maxProposalSize = -1;
-
- public synchronized int getLastProposalSize() {
- return lastProposalSize;
- }
-
- synchronized void setLastProposalSize(int value) {
- lastProposalSize = value;
- if (minProposalSize == -1 || value < minProposalSize) {
- minProposalSize = value;
- }
- if (value > maxProposalSize) {
- maxProposalSize = value;
- }
- }
-
- public synchronized int getMinProposalSize() {
- return minProposalSize;
- }
-
- public synchronized int getMaxProposalSize() {
- return maxProposalSize;
- }
-
- public synchronized void reset() {
- lastProposalSize = -1;
- minProposalSize = -1;
- maxProposalSize = -1;
- }
-
- public synchronized String toString() {
- return String.format("%d/%d/%d", lastProposalSize, minProposalSize, maxProposalSize);
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0aa3b3f/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java
deleted file mode 100644
index ef4036a..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.jmx.ZKMBeanInfo;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-
-public class QuorumBean implements QuorumMXBean, ZKMBeanInfo {
- private final QuorumPeer peer;
- private final String name;
-
- public QuorumBean(QuorumPeer peer){
- this.peer = peer;
- name = "ReplicatedServer_id" + peer.getMyid();
- }
-
- public String getName() {
- return name;
- }
-
- public boolean isHidden() {
- return false;
- }
-
- public int getQuorumSize() {
- return peer.getQuorumSize();
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0aa3b3f/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
deleted file mode 100644
index ec6be4a..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ /dev/null
@@ -1,1152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.zookeeper.server.ZooKeeperThread;
-import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
-import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class implements a connection manager for leader election using TCP. It
- * maintains one connection for every pair of servers. The tricky part is to
- * guarantee that there is exactly one connection for every pair of servers that
- * are operating correctly and that can communicate over the network.
- *
- * If two servers try to start a connection concurrently, then the connection
- * manager uses a very simple tie-breaking mechanism to decide which connection
- * to drop based on the IP addressed of the two parties.
- *
- * For every peer, the manager maintains a queue of messages to send. If the
- * connection to any particular peer drops, then the sender thread puts the
- * message back on the list. As this implementation currently uses a queue
- * implementation to maintain messages to send to another peer, we add the
- * message to the tail of the queue, thus changing the order of messages.
- * Although this is not a problem for the leader election, it could be a problem
- * when consolidating peer communication. This is to be verified, though.
- *
- */
-
-public class QuorumCnxManager {
- private static final Logger LOG = LoggerFactory.getLogger(QuorumCnxManager.class);
-
- /*
- * Maximum capacity of thread queues
- */
- static final int RECV_CAPACITY = 100;
- // Initialized to 1 to prevent sending
- // stale notifications to peers
- static final int SEND_CAPACITY = 1;
-
- static final int PACKETMAXSIZE = 1024 * 512;
-
- /*
- * Max buffer size to be read from the network.
- */
- static public final int maxBuffer = 2048;
-
- /*
- * Negative counter for observer server ids.
- */
-
- private AtomicLong observerCounter = new AtomicLong(-1);
-
- /*
- * Connection time out value in milliseconds
- */
-
- private int cnxTO = 5000;
-
- /*
- * Local IP address
- */
- final long mySid;
- final int socketTimeout;
- final Map<Long, QuorumPeer.QuorumServer> view;
- final boolean tcpKeepAlive = Boolean.getBoolean("zookeeper.tcpKeepAlive");
- final boolean listenOnAllIPs;
- private ThreadPoolExecutor connectionExecutor;
- private final Set<Long> inprogressConnections = Collections
- .synchronizedSet(new HashSet<Long>());
- private QuorumAuthServer authServer;
- private QuorumAuthLearner authLearner;
- private boolean quorumSaslAuthEnabled;
- /*
- * Counter to count connection processing threads.
- */
- private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
-
- /*
- * Mapping from Peer to Thread number
- */
- final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
- final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
- final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
-
- /*
- * Reception queue
- */
- public final ArrayBlockingQueue<Message> recvQueue;
- /*
- * Object to synchronize access to recvQueue
- */
- private final Object recvQLock = new Object();
-
- /*
- * Shutdown flag
- */
-
- volatile boolean shutdown = false;
-
- /*
- * Listener thread
- */
- public final Listener listener;
-
- /*
- * Counter to count worker threads
- */
- private AtomicInteger threadCnt = new AtomicInteger(0);
-
- static public class Message {
-
- Message(ByteBuffer buffer, long sid) {
- this.buffer = buffer;
- this.sid = sid;
- }
-
- ByteBuffer buffer;
- long sid;
- }
-
- public QuorumCnxManager(final long mySid,
- Map<Long,QuorumPeer.QuorumServer> view,
- QuorumAuthServer authServer,
- QuorumAuthLearner authLearner,
- int socketTimeout,
- boolean listenOnAllIPs,
- int quorumCnxnThreadsSize,
- boolean quorumSaslAuthEnabled) {
- this(mySid, view, authServer, authLearner, socketTimeout, listenOnAllIPs,
- quorumCnxnThreadsSize, quorumSaslAuthEnabled, new ConcurrentHashMap<Long, SendWorker>());
- }
-
- // visible for testing
- public QuorumCnxManager(final long mySid,
- Map<Long,QuorumPeer.QuorumServer> view,
- QuorumAuthServer authServer,
- QuorumAuthLearner authLearner,
- int socketTimeout,
- boolean listenOnAllIPs,
- int quorumCnxnThreadsSize,
- boolean quorumSaslAuthEnabled,
- ConcurrentHashMap<Long, SendWorker> senderWorkerMap) {
- this.senderWorkerMap = senderWorkerMap;
-
- this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
- this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
- this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
- String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
- if(cnxToValue != null){
- this.cnxTO = Integer.parseInt(cnxToValue);
- }
-
- this.mySid = mySid;
- this.socketTimeout = socketTimeout;
- this.view = view;
- this.listenOnAllIPs = listenOnAllIPs;
-
- initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
- quorumSaslAuthEnabled);
-
- // Starts listener thread that waits for connection requests
- listener = new Listener();
- }
-
- private void initializeAuth(final long mySid,
- final QuorumAuthServer authServer,
- final QuorumAuthLearner authLearner,
- final int quorumCnxnThreadsSize,
- final boolean quorumSaslAuthEnabled) {
- this.authServer = authServer;
- this.authLearner = authLearner;
- this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
- if (!this.quorumSaslAuthEnabled) {
- LOG.debug("Not initializing connection executor as quorum sasl auth is disabled");
- return;
- }
-
- // init connection executors
- final AtomicInteger threadIndex = new AtomicInteger(1);
- SecurityManager s = System.getSecurityManager();
- final ThreadGroup group = (s != null) ? s.getThreadGroup()
- : Thread.currentThread().getThreadGroup();
- ThreadFactory daemonThFactory = new ThreadFactory() {
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r, "QuorumConnectionThread-"
- + "[myid=" + mySid + "]-"
- + threadIndex.getAndIncrement());
- return t;
- }
- };
- this.connectionExecutor = new ThreadPoolExecutor(3,
- quorumCnxnThreadsSize, 60, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(), daemonThFactory);
- this.connectionExecutor.allowCoreThreadTimeOut(true);
- }
-
- /**
- * Invokes initiateConnection for testing purposes
- *
- * @param sid
- */
- public void testInitiateConnection(long sid) throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Opening channel to server " + sid);
- }
- Socket sock = new Socket();
- setSockOpts(sock);
- sock.connect(QuorumPeer.viewToVotingView(view).get(sid).electionAddr,
- cnxTO);
- initiateConnection(sock, sid);
- }
-
- /**
- * If this server has initiated the connection, then it gives up on the
- * connection if it loses challenge. Otherwise, it keeps the connection.
- */
- public void initiateConnection(final Socket sock, final Long sid) {
- try {
- startConnection(sock, sid);
- } catch (IOException e) {
- LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
- new Object[] { sid, sock.getRemoteSocketAddress() }, e);
- closeSocket(sock);
- return;
- }
- }
-
- /**
- * Server will initiate the connection request to its peer server
- * asynchronously via separate connection thread.
- */
- public void initiateConnectionAsync(final Socket sock, final Long sid) {
- if(!inprogressConnections.add(sid)){
- // simply return as there is a connection request to
- // server 'sid' already in progress.
- LOG.debug("Connection request to server id: {} is already in progress, so skipping this request",
- sid);
- closeSocket(sock);
- return;
- }
- try {
- connectionExecutor.execute(
- new QuorumConnectionReqThread(sock, sid));
- connectionThreadCnt.incrementAndGet();
- } catch (Throwable e) {
- // Imp: Safer side catching all type of exceptions and remove 'sid'
- // from inprogress connections. This is to avoid blocking further
- // connection requests from this 'sid' in case of errors.
- inprogressConnections.remove(sid);
- LOG.error("Exception while submitting quorum connection request", e);
- closeSocket(sock);
- }
- }
-
- /**
- * Thread to send connection request to peer server.
- */
- private class QuorumConnectionReqThread extends ZooKeeperThread {
- final Socket sock;
- final Long sid;
- QuorumConnectionReqThread(final Socket sock, final Long sid) {
- super("QuorumConnectionReqThread-" + sid);
- this.sock = sock;
- this.sid = sid;
- }
-
- @Override
- public void run() {
- try{
- initiateConnection(sock, sid);
- } finally {
- inprogressConnections.remove(sid);
- }
- }
- }
-
- private boolean startConnection(Socket sock, Long sid)
- throws IOException {
- DataOutputStream dout = null;
- DataInputStream din = null;
- try {
- // Sending id and challenge
- dout = new DataOutputStream(sock.getOutputStream());
- dout.writeLong(this.mySid);
- dout.flush();
-
- din = new DataInputStream(
- new BufferedInputStream(sock.getInputStream()));
- } catch (IOException e) {
- LOG.warn("Ignoring exception reading or writing challenge: ", e);
- closeSocket(sock);
- return false;
- }
-
- // authenticate learner
- authLearner.authenticate(sock, view.get(sid).hostname);
-
- // If lost the challenge, then drop the new connection
- if (sid > this.mySid) {
- LOG.info("Have smaller server identifier, so dropping the " +
- "connection: (" + sid + ", " + this.mySid + ")");
- closeSocket(sock);
- // Otherwise proceed with the connection
- } else {
- SendWorker sw = new SendWorker(sock, sid);
- RecvWorker rw = new RecvWorker(sock, din, sid, sw);
- sw.setRecv(rw);
-
- SendWorker vsw = senderWorkerMap.get(sid);
-
- if(vsw != null)
- vsw.finish();
-
- senderWorkerMap.put(sid, sw);
- queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
-
- sw.start();
- rw.start();
-
- return true;
-
- }
- return false;
- }
-
- /**
- * If this server receives a connection request, then it gives up on the new
- * connection if it wins. Notice that it checks whether it has a connection
- * to this server already or not. If it does, then it sends the smallest
- * possible long value to lose the challenge.
- *
- */
- public void receiveConnection(final Socket sock) {
- DataInputStream din = null;
- try {
- din = new DataInputStream(
- new BufferedInputStream(sock.getInputStream()));
-
- handleConnection(sock, din);
- } catch (IOException e) {
- LOG.error("Exception handling connection, addr: {}, closing server connection",
- sock.getRemoteSocketAddress());
- closeSocket(sock);
- }
- }
-
- /**
- * Server receives a connection request and handles it asynchronously via
- * separate thread.
- */
- public void receiveConnectionAsync(final Socket sock) {
- try {
- connectionExecutor.execute(
- new QuorumConnectionReceiverThread(sock));
- connectionThreadCnt.incrementAndGet();
- } catch (Throwable e) {
- LOG.error("Exception handling connection, addr: {}, closing server connection",
- sock.getRemoteSocketAddress());
- closeSocket(sock);
- }
- }
-
- /**
- * Thread to receive connection request from peer server.
- */
- private class QuorumConnectionReceiverThread extends ZooKeeperThread {
- private final Socket sock;
- QuorumConnectionReceiverThread(final Socket sock) {
- super("QuorumConnectionReceiverThread-" + sock.getRemoteSocketAddress());
- this.sock = sock;
- }
-
- @Override
- public void run() {
- receiveConnection(sock);
- }
- }
-
- private void handleConnection(Socket sock, DataInputStream din)
- throws IOException {
- Long sid = null;
- try {
- // Read server id
- sid = din.readLong();
- if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
- sid = din.readLong();
-
- // next comes the #bytes in the remainder of the message
- // note that 0 bytes is fine (old servers)
- int num_remaining_bytes = din.readInt();
- if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
- LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
- closeSocket(sock);
- return;
- }
- byte[] b = new byte[num_remaining_bytes];
-
- // remove the remainder of the message from din
- int num_read = din.read(b);
- if (num_read != num_remaining_bytes) {
- LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
- }
- }
- if (sid == QuorumPeer.OBSERVER_ID) {
- /*
- * Choose identifier at random. We need a value to identify
- * the connection.
- */
- sid = observerCounter.getAndDecrement();
- LOG.info("Setting arbitrary identifier to observer: " + sid);
- }
- } catch (IOException e) {
- closeSocket(sock);
- LOG.warn("Exception reading or writing challenge: " + e.toString());
- return;
- }
-
- // do authenticating learner
- LOG.debug("Authenticating learner server.id: {}", sid);
- authServer.authenticate(sock, din);
-
- //If wins the challenge, then close the new connection.
- if (sid < this.mySid) {
- /*
- * This replica might still believe that the connection to sid is
- * up, so we have to shut down the workers before trying to open a
- * new connection.
- */
- SendWorker sw = senderWorkerMap.get(sid);
- if (sw != null) {
- sw.finish();
- }
-
- /*
- * Now we start a new connection
- */
- LOG.debug("Create new connection to server: " + sid);
- closeSocket(sock);
- connectOne(sid);
-
- // Otherwise start worker threads to receive data.
- } else {
- SendWorker sw = new SendWorker(sock, sid);
- RecvWorker rw = new RecvWorker(sock, din, sid, sw);
- sw.setRecv(rw);
-
- SendWorker vsw = senderWorkerMap.get(sid);
-
- if(vsw != null)
- vsw.finish();
-
- senderWorkerMap.put(sid, sw);
- queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
-
- sw.start();
- rw.start();
-
- return;
- }
- }
-
- /**
- * Processes invoke this message to queue a message to send. Currently,
- * only leader election uses it.
- */
- public void toSend(Long sid, ByteBuffer b) {
- /*
- * If sending message to myself, then simply enqueue it (loopback).
- */
- if (this.mySid == sid) {
- b.position(0);
- addToRecvQueue(new Message(b.duplicate(), sid));
- /*
- * Otherwise send to the corresponding thread to send.
- */
- } else {
- /*
- * Start a new connection if doesn't have one already.
- */
- ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
- ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq);
- if (bqExisting != null) {
- addToSendQueue(bqExisting, b);
- } else {
- addToSendQueue(bq, b);
- }
- connectOne(sid);
-
- }
- }
-
- /**
- * Try to establish a connection to server with id sid.
- *
- * @param sid server id
- */
- synchronized public void connectOne(long sid){
- if (!connectedToPeer(sid)){
- InetSocketAddress electionAddr;
- if (view.containsKey(sid)) {
- electionAddr = view.get(sid).electionAddr;
- } else {
- LOG.warn("Invalid server id: " + sid);
- return;
- }
- try {
-
- LOG.debug("Opening channel to server " + sid);
- Socket sock = new Socket();
- setSockOpts(sock);
- sock.connect(view.get(sid).electionAddr, cnxTO);
- LOG.debug("Connected to server " + sid);
-
- // Sends connection request asynchronously if the quorum
- // sasl authentication is enabled. This is required because
- // sasl server authentication process may take few seconds to
- // finish, this may delay next peer connection requests.
- if (quorumSaslAuthEnabled) {
- initiateConnectionAsync(sock, sid);
- } else {
- initiateConnection(sock, sid);
- }
- } catch (UnresolvedAddressException e) {
- // Sun doesn't include the address that causes this
- // exception to be thrown, also UAE cannot be wrapped cleanly
- // so we log the exception in order to capture this critical
- // detail.
- LOG.warn("Cannot open channel to " + sid
- + " at election address " + electionAddr, e);
- // Resolve hostname for this server in case the
- // underlying ip address has changed.
- if (view.containsKey(sid)) {
- view.get(sid).recreateSocketAddresses();
- }
- throw e;
- } catch (IOException e) {
- LOG.warn("Cannot open channel to " + sid
- + " at election address " + electionAddr,
- e);
- // We can't really tell if the server is actually down or it failed
- // to connect to the server because the underlying IP address
- // changed. Resolve the hostname again just in case.
- if (view.containsKey(sid)) {
- view.get(sid).recreateSocketAddresses();
- }
- }
- } else {
- LOG.debug("There is a connection already for server " + sid);
- }
- }
-
-
- /**
- * Try to establish a connection with each server if one
- * doesn't exist.
- */
-
- public void connectAll(){
- long sid;
- for(Enumeration<Long> en = queueSendMap.keys();
- en.hasMoreElements();){
- sid = en.nextElement();
- connectOne(sid);
- }
- }
-
-
- /**
- * Check if all queues are empty, indicating that all messages have been delivered.
- */
- boolean haveDelivered() {
- for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
- LOG.debug("Queue size: " + queue.size());
- if (queue.size() == 0) {
- return true;
- }
- }
-
- return false;
- }
-
- /**
- * Flag that it is time to wrap up all activities and interrupt the listener.
- */
- public void halt() {
- shutdown = true;
- LOG.debug("Halting listener");
- listener.halt();
-
- softHalt();
-
- // clear data structures used for auth
- if (connectionExecutor != null) {
- connectionExecutor.shutdown();
- }
- inprogressConnections.clear();
- resetConnectionThreadCount();
- }
-
- /**
- * A soft halt simply finishes workers.
- */
- public void softHalt() {
- for (SendWorker sw : senderWorkerMap.values()) {
- LOG.debug("Halting sender: " + sw);
- sw.finish();
- }
- }
-
- /**
- * Helper method to set socket options.
- *
- * @param sock
- * Reference to socket
- */
- private void setSockOpts(Socket sock) throws SocketException {
- sock.setTcpNoDelay(true);
- sock.setKeepAlive(tcpKeepAlive);
- sock.setSoTimeout(socketTimeout);
- }
-
- /**
- * Helper method to close a socket.
- *
- * @param sock
- * Reference to socket
- */
- private void closeSocket(Socket sock) {
- try {
- sock.close();
- } catch (IOException ie) {
- LOG.error("Exception while closing", ie);
- }
- }
-
- /**
- * Return number of worker threads
- */
- public long getThreadCount() {
- return threadCnt.get();
- }
-
- /**
- * Return number of connection processing threads.
- */
- public long getConnectionThreadCount() {
- return connectionThreadCnt.get();
- }
-
- /**
- * Reset the value of connection processing threads count to zero.
- */
- private void resetConnectionThreadCount() {
- connectionThreadCnt.set(0);
- }
-
- /**
- * Thread to listen on some port
- */
- public class Listener extends ZooKeeperThread {
-
- volatile ServerSocket ss = null;
-
- public Listener() {
- // During startup of thread, thread name will be overridden to
- // specific election address
- super("ListenerThread");
- }
-
- /**
- * Sleeps on accept().
- */
- @Override
- public void run() {
- int numRetries = 0;
- InetSocketAddress addr;
- while((!shutdown) && (numRetries < 3)){
- try {
- ss = new ServerSocket();
- ss.setReuseAddress(true);
- if (listenOnAllIPs) {
- int port = view.get(QuorumCnxManager.this.mySid)
- .electionAddr.getPort();
- addr = new InetSocketAddress(port);
- } else {
- addr = view.get(QuorumCnxManager.this.mySid)
- .electionAddr;
- }
- LOG.info("My election bind port: " + addr.toString());
- setName(view.get(QuorumCnxManager.this.mySid)
- .electionAddr.toString());
- ss.bind(addr);
- while (!shutdown) {
- Socket client = ss.accept();
- setSockOpts(client);
- LOG.info("Received connection request "
- + client.getRemoteSocketAddress());
-
- // Receive and handle the connection request
- // asynchronously if the quorum sasl authentication is
- // enabled. This is required because sasl server
- // authentication process may take few seconds to finish,
- // this may delay next peer connection requests.
- if (quorumSaslAuthEnabled) {
- receiveConnectionAsync(client);
- } else {
- receiveConnection(client);
- }
-
- numRetries = 0;
- }
- } catch (IOException e) {
- LOG.error("Exception while listening", e);
- numRetries++;
- try {
- ss.close();
- Thread.sleep(1000);
- } catch (IOException ie) {
- LOG.error("Error closing server socket", ie);
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while sleeping. " +
- "Ignoring exception", ie);
- }
- }
- }
- LOG.info("Leaving listener");
- if (!shutdown) {
- LOG.error("As I'm leaving the listener thread, "
- + "I won't be able to participate in leader "
- + "election any longer: "
- + view.get(QuorumCnxManager.this.mySid).electionAddr);
- }
- }
-
- /**
- * Halts this listener thread.
- */
- void halt(){
- try{
- LOG.debug("Trying to close listener: " + ss);
- if(ss != null) {
- LOG.debug("Closing listener: "
- + QuorumCnxManager.this.mySid);
- ss.close();
- }
- } catch (IOException e){
- LOG.warn("Exception when shutting down listener: " + e);
- }
- }
- }
-
- /**
- * Thread to send messages. Instance waits on a queue, and send a message as
- * soon as there is one available. If connection breaks, then opens a new
- * one.
- */
- class SendWorker extends ZooKeeperThread {
- Long sid;
- Socket sock;
- RecvWorker recvWorker;
- volatile boolean running = true;
- DataOutputStream dout;
-
- /**
- * An instance of this thread receives messages to send
- * through a queue and sends them to the server sid.
- *
- * @param sock
- * Socket to remote peer
- * @param sid
- * Server identifier of remote peer
- */
- SendWorker(Socket sock, Long sid) {
- super("SendWorker:" + sid);
- this.sid = sid;
- this.sock = sock;
- recvWorker = null;
- try {
- dout = new DataOutputStream(sock.getOutputStream());
- } catch (IOException e) {
- LOG.error("Unable to access socket output stream", e);
- closeSocket(sock);
- running = false;
- }
- LOG.debug("Address of remote peer: " + this.sid);
- }
-
- synchronized void setRecv(RecvWorker recvWorker) {
- this.recvWorker = recvWorker;
- }
-
- /**
- * Returns RecvWorker that pairs up with this SendWorker.
- *
- * @return RecvWorker
- */
- synchronized RecvWorker getRecvWorker(){
- return recvWorker;
- }
-
- synchronized boolean finish() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Calling finish for " + sid);
- }
-
- if(!running){
- /*
- * Avoids running finish() twice.
- */
- return running;
- }
-
- running = false;
- closeSocket(sock);
- // channel = null;
-
- this.interrupt();
- if (recvWorker != null) {
- recvWorker.finish();
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
- }
- senderWorkerMap.remove(sid, this);
- threadCnt.decrementAndGet();
- return running;
- }
-
- synchronized void send(ByteBuffer b) throws IOException {
- byte[] msgBytes = new byte[b.capacity()];
- try {
- b.position(0);
- b.get(msgBytes);
- } catch (BufferUnderflowException be) {
- LOG.error("BufferUnderflowException ", be);
- return;
- }
- dout.writeInt(b.capacity());
- dout.write(b.array());
- dout.flush();
- }
-
- @Override
- public void run() {
- threadCnt.incrementAndGet();
- try {
- /**
- * If there is nothing in the queue to send, then we
- * send the lastMessage to ensure that the last message
- * was received by the peer. The message could be dropped
- * in case self or the peer shutdown their connection
- * (and exit the thread) prior to reading/processing
- * the last message. Duplicate messages are handled correctly
- * by the peer.
- *
- * If the send queue is non-empty, then we have a recent
- * message than that stored in lastMessage. To avoid sending
- * stale message, we should send the message in the send queue.
- */
- ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
- if (bq == null || isSendQueueEmpty(bq)) {
- ByteBuffer b = lastMessageSent.get(sid);
- if (b != null) {
- LOG.debug("Attempting to send lastMessage to sid=" + sid);
- send(b);
- }
- }
- } catch (IOException e) {
- LOG.error("Failed to send last message. Shutting down thread.", e);
- this.finish();
- }
-
- try {
- while (running && !shutdown && sock != null) {
-
- ByteBuffer b = null;
- try {
- ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
- .get(sid);
- if (bq != null) {
- b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
- } else {
- LOG.error("No queue of incoming messages for " +
- "server " + sid);
- break;
- }
-
- if(b != null){
- lastMessageSent.put(sid, b);
- send(b);
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for message on queue",
- e);
- }
- }
- } catch (Exception e) {
- LOG.warn("Exception when using channel: for id " + sid
- + " my id = " + QuorumCnxManager.this.mySid
- + " error = " + e);
- }
- this.finish();
- LOG.warn("Send worker leaving thread");
- }
- }
-
- /**
- * Thread to receive messages. Instance waits on a socket read. If the
- * channel breaks, then removes itself from the pool of receivers.
- */
- class RecvWorker extends ZooKeeperThread {
- Long sid;
- Socket sock;
- volatile boolean running = true;
- final DataInputStream din;
- final SendWorker sw;
-
- RecvWorker(Socket sock, DataInputStream din, Long sid, SendWorker sw) {
- super("RecvWorker:" + sid);
- this.sid = sid;
- this.sock = sock;
- this.sw = sw;
- this.din = din;
- try {
- // OK to wait until socket disconnects while reading.
- sock.setSoTimeout(0);
- } catch (IOException e) {
- LOG.error("Error while accessing socket for " + sid, e);
- closeSocket(sock);
- running = false;
- }
- }
-
- /**
- * Shuts down this worker
- *
- * @return boolean Value of variable running
- */
- synchronized boolean finish() {
- if(!running){
- /*
- * Avoids running finish() twice.
- */
- return running;
- }
- running = false;
-
- this.interrupt();
- threadCnt.decrementAndGet();
- return running;
- }
-
- @Override
- public void run() {
- threadCnt.incrementAndGet();
- try {
- while (running && !shutdown && sock != null) {
- /**
- * Reads the first int to determine the length of the
- * message
- */
- int length = din.readInt();
- if (length <= 0 || length > PACKETMAXSIZE) {
- throw new IOException(
- "Received packet with invalid packet: "
- + length);
- }
- /**
- * Allocates a new ByteBuffer to receive the message
- */
- byte[] msgArray = new byte[length];
- din.readFully(msgArray, 0, length);
- ByteBuffer message = ByteBuffer.wrap(msgArray);
- addToRecvQueue(new Message(message.duplicate(), sid));
- }
- } catch (Exception e) {
- LOG.warn("Connection broken for id " + sid + ", my id = "
- + QuorumCnxManager.this.mySid + ", error = " , e);
- } finally {
- LOG.warn("Interrupting SendWorker");
- sw.finish();
- if (sock != null) {
- closeSocket(sock);
- }
- }
- }
- }
-
- /**
- * Inserts an element in the specified queue. If the Queue is full, this
- * method removes an element from the head of the Queue and then inserts
- * the element at the tail. It can happen that the an element is removed
- * by another thread in {@link SendWorker#processMessage() processMessage}
- * method before this method attempts to remove an element from the queue.
- * This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
- * exception, which is safe to ignore.
- *
- * Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
- * not need to be synchronized since there is only one thread that inserts
- * an element in the queue and another thread that reads from the queue.
- *
- * @param queue
- * Reference to the Queue
- * @param buffer
- * Reference to the buffer to be inserted in the queue
- */
- private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
- ByteBuffer buffer) {
- if (queue.remainingCapacity() == 0) {
- try {
- queue.remove();
- } catch (NoSuchElementException ne) {
- // element could be removed by poll()
- LOG.debug("Trying to remove from an empty " +
- "Queue. Ignoring exception " + ne);
- }
- }
- try {
- queue.add(buffer);
- } catch (IllegalStateException ie) {
- // This should never happen
- LOG.error("Unable to insert an element in the queue " + ie);
- }
- }
-
- /**
- * Returns true if queue is empty.
- * @param queue
- * Reference to the queue
- * @return
- * true if the specified queue is empty
- */
- private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
- return queue.isEmpty();
- }
-
- /**
- * Retrieves and removes buffer at the head of this queue,
- * waiting up to the specified wait time if necessary for an element to
- * become available.
- *
- * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
- */
- private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
- long timeout, TimeUnit unit) throws InterruptedException {
- return queue.poll(timeout, unit);
- }
-
- /**
- * Inserts an element in the {@link #recvQueue}. If the Queue is full, this
- * methods removes an element from the head of the Queue and then inserts
- * the element at the tail of the queue.
- *
- * This method is synchronized to achieve fairness between two threads that
- * are trying to insert an element in the queue. Each thread checks if the
- * queue is full, then removes the element at the head of the queue, and
- * then inserts an element at the tail. This three-step process is done to
- * prevent a thread from blocking while inserting an element in the queue.
- * If we do not synchronize the call to this method, then a thread can grab
- * a slot in the queue created by the second thread. This can cause the call
- * to insert by the second thread to fail.
- * Note that synchronizing this method does not block another thread
- * from polling the queue since that synchronization is provided by the
- * queue itself.
- *
- * @param msg
- * Reference to the message to be inserted in the queue
- */
- public void addToRecvQueue(Message msg) {
- synchronized(recvQLock) {
- if (recvQueue.remainingCapacity() == 0) {
- try {
- recvQueue.remove();
- } catch (NoSuchElementException ne) {
- // element could be removed by poll()
- LOG.debug("Trying to remove from an empty " +
- "recvQueue. Ignoring exception " + ne);
- }
- }
- try {
- recvQueue.add(msg);
- } catch (IllegalStateException ie) {
- // This should never happen
- LOG.error("Unable to insert element in the recvQueue " + ie);
- }
- }
- }
-
- /**
- * Retrieves and removes a message at the head of this queue,
- * waiting up to the specified wait time if necessary for an element to
- * become available.
- *
- * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
- */
- public Message pollRecvQueue(long timeout, TimeUnit unit)
- throws InterruptedException {
- return recvQueue.poll(timeout, unit);
- }
-
- public boolean connectedToPeer(long peerSid) {
- return senderWorkerMap.get(peerSid) != null;
- }
-}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c0aa3b3f/src/java/main/org/apache/zookeeper/server/quorum/QuorumMXBean.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumMXBean.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumMXBean.java
deleted file mode 100644
index 2edce68..0000000
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumMXBean.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-/**
- * An MBean representing a zookeeper cluster nodes (aka quorum peers)
- */
-public interface QuorumMXBean {
- /**
- * @return the name of the quorum
- */
- public String getName();
-
- /**
- * @return configured number of peers in the quorum
- */
- public int getQuorumSize();
-}
|