From commits-return-6509-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Wed Jul 11 06:04:23 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9A8CE180634 for ; Wed, 11 Jul 2018 06:04:22 +0200 (CEST) Received: (qmail 62302 invoked by uid 500); 11 Jul 2018 04:04:21 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 62289 invoked by uid 99); 11 Jul 2018 04:04:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jul 2018 04:04:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 41CEEDFAD1; Wed, 11 Jul 2018 04:04:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: hanm@apache.org To: commits@zookeeper.apache.org Message-Id: <1459c72842414104afd6cf8e4ded4061@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: zookeeper git commit: ZOOKEEPER-3084: Exit when ZooKeeper cannot bind to the leader election port Date: Wed, 11 Jul 2018 04:04:21 +0000 (UTC) Repository: zookeeper Updated Branches: refs/heads/master 5fdd70ac4 -> c2e7ed1e6 ZOOKEEPER-3084: Exit when ZooKeeper cannot bind to the leader election port Author: Fangmin Lyu Reviewers: Andor Molnár , Benjamin Reed , Norbert Kalmar , Michael Han Closes #562 from lvfangmin/ZOOKEEPER-3084 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/c2e7ed1e Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/c2e7ed1e Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/c2e7ed1e Branch: refs/heads/master Commit: c2e7ed1e6f8f2de48778db7f3d63f9629c086ea8 Parents: 5fdd70a Author: Fangmin Lyu Authored: Tue Jul 10 21:04:17 2018 -0700 Committer: Michael Han Committed: Tue Jul 10 21:04:17 2018 -0700 ---------------------------------------------------------------------- .../server/quorum/QuorumCnxManager.java | 107 ++++++++++--------- 1 file changed, 58 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c2e7ed1e/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index 09da63a..705b846 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -23,6 +23,7 @@ import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.BindException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; @@ -58,11 +59,11 @@ import org.slf4j.LoggerFactory; * maintains one connection for every pair of servers. The tricky part is to * guarantee that there is exactly one connection for every pair of servers that * are operating correctly and that can communicate over the network. - * + * * If two servers try to start a connection concurrently, then the connection * manager uses a very simple tie-breaking mechanism to decide which connection - * to drop based on the IP addressed of the two parties. - * + * to drop based on the IP addressed of the two parties. + * * For every peer, the manager maintains a queue of messages to send. If the * connection to any particular peer drops, then the sender thread puts the * message back on the list. As this implementation currently uses a queue @@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory; * message to the tail of the queue, thus changing the order of messages. * Although this is not a problem for the leader election, it could be a problem * when consolidating peer communication. This is to be verified, though. - * + * */ public class QuorumCnxManager { @@ -85,7 +86,7 @@ public class QuorumCnxManager { static final int SEND_CAPACITY = 1; static final int PACKETMAXSIZE = 1024 * 512; - + /* * Negative counter for observer server ids. */ @@ -103,9 +104,9 @@ public class QuorumCnxManager { static public final int maxBuffer = 2048; /* - * Connection time out value in milliseconds + * Connection time out value in milliseconds */ - + private int cnxTO = 5000; final QuorumPeer self; @@ -255,12 +256,12 @@ public class QuorumCnxManager { this.queueSendMap = new ConcurrentHashMap>(); this.senderWorkerMap = new ConcurrentHashMap(); this.lastMessageSent = new ConcurrentHashMap(); - + String cnxToValue = System.getProperty("zookeeper.cnxTimeout"); if(cnxToValue != null){ this.cnxTO = Integer.parseInt(cnxToValue); } - + this.self = self; this.mySid = mySid; @@ -313,7 +314,7 @@ public class QuorumCnxManager { /** * Invokes initiateConnection for testing purposes - * + * * @param sid */ public void testInitiateConnection(long sid) throws Exception { @@ -436,24 +437,24 @@ public class QuorumCnxManager { sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); - + if(vsw != null) vsw.finish(); - + senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue( SEND_CAPACITY)); - + sw.start(); rw.start(); - - return true; - + + return true; + } return false; } - - + + /** * If this server receives a connection request, then it gives up on the new * connection if it wins. Notice that it checks whether it has a connection @@ -575,7 +576,7 @@ public class QuorumCnxManager { sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); - + if (vsw != null) { vsw.finish(); } @@ -584,14 +585,14 @@ public class QuorumCnxManager { queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(SEND_CAPACITY)); - + sw.start(); rw.start(); } } /** - * Processes invoke this message to queue a message to send. Currently, + * Processes invoke this message to queue a message to send. Currently, * only leader election uses it. */ public void toSend(Long sid, ByteBuffer b) { @@ -617,13 +618,13 @@ public class QuorumCnxManager { addToSendQueue(bq, b); } connectOne(sid); - + } } - + /** * Try to establish a connection to server with id sid using its electionAddr. - * + * * @param sid server id * @return boolean success indication */ @@ -666,12 +667,12 @@ public class QuorumCnxManager { closeSocket(sock); return false; } - + } - + /** * Try to establish a connection to server with id sid. - * + * * @param sid server id */ synchronized void connectOne(long sid){ @@ -705,22 +706,22 @@ public class QuorumCnxManager { } } } - - + + /** * Try to establish a connection with each server if one * doesn't exist. */ - + public void connectAll(){ long sid; for(Enumeration en = queueSendMap.keys(); en.hasMoreElements();){ sid = en.nextElement(); connectOne(sid); - } + } } - + /** * Check if all queues are empty, indicating that all messages have been delivered. @@ -743,7 +744,7 @@ public class QuorumCnxManager { shutdown = true; LOG.debug("Halting listener"); listener.halt(); - + // Wait for the listener to terminate. try { listener.join(); @@ -759,7 +760,7 @@ public class QuorumCnxManager { inprogressConnections.clear(); resetConnectionThreadCount(); } - + /** * A soft halt simply finishes workers. */ @@ -772,7 +773,7 @@ public class QuorumCnxManager { /** * Helper method to set socket options. - * + * * @param sock * Reference to socket */ @@ -784,7 +785,7 @@ public class QuorumCnxManager { /** * Helper method to close a socket. - * + * * @param sock * Reference to socket */ @@ -842,6 +843,7 @@ public class QuorumCnxManager { int numRetries = 0; InetSocketAddress addr; Socket client = null; + IOException exitException = null; while((!shutdown) && (numRetries < 3)){ try { ss = new ServerSocket(); @@ -886,6 +888,7 @@ public class QuorumCnxManager { break; } LOG.error("Exception while listening", e); + exitException = e; numRetries++; try { ss.close(); @@ -905,6 +908,12 @@ public class QuorumCnxManager { + "I won't be able to participate in leader " + "election any longer: " + self.getElectionAddress()); + if (exitException instanceof BindException) { + // After leaving listener thread, the host cannot join the + // quorum anymore, this is a severe error that we cannot + // recover from, so we need to exit + System.exit(14); + } } else if (ss != null) { // Clean up for shutdown. try { @@ -948,7 +957,7 @@ public class QuorumCnxManager { /** * An instance of this thread receives messages to send * through a queue and sends them to the server sid. - * + * * @param sock * Socket to remote peer * @param sid @@ -975,23 +984,23 @@ public class QuorumCnxManager { /** * Returns RecvWorker that pairs up with this SendWorker. - * - * @return RecvWorker + * + * @return RecvWorker */ synchronized RecvWorker getRecvWorker(){ return recvWorker; } - + synchronized boolean finish() { LOG.debug("Calling finish for " + sid); - + if(!running){ /* - * Avoids running finish() twice. + * Avoids running finish() twice. */ return running; } - + running = false; closeSocket(sock); @@ -1006,7 +1015,7 @@ public class QuorumCnxManager { threadCnt.decrementAndGet(); return running; } - + synchronized void send(ByteBuffer b) throws IOException { byte[] msgBytes = new byte[b.capacity()]; try { @@ -1050,7 +1059,7 @@ public class QuorumCnxManager { LOG.error("Failed to send last message. Shutting down thread.", e); this.finish(); } - + try { while (running && !shutdown && sock != null) { @@ -1111,20 +1120,20 @@ public class QuorumCnxManager { running = false; } } - + /** * Shuts down this worker - * + * * @return boolean Value of variable running */ synchronized boolean finish() { if(!running){ /* - * Avoids running finish() twice. + * Avoids running finish() twice. */ return running; } - running = false; + running = false; this.interrupt(); threadCnt.decrementAndGet();