Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 90D98200B32 for ; Thu, 23 Jun 2016 23:45:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8F5EC160A59; Thu, 23 Jun 2016 21:45:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5ED82160A35 for ; Thu, 23 Jun 2016 23:45:57 +0200 (CEST) Received: (qmail 52876 invoked by uid 500); 23 Jun 2016 21:45:56 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 52865 invoked by uid 99); 23 Jun 2016 21:45:56 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jun 2016 21:45:56 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id C07ABC028E for ; Thu, 23 Jun 2016 21:45:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.374 X-Spam-Level: X-Spam-Status: No, score=0.374 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 7Q-iAdAxPtmV for ; Thu, 23 Jun 2016 21:45:52 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 8183D5FCC8 for ; Thu, 23 Jun 2016 21:45:52 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 37707E021C for ; Thu, 23 Jun 2016 21:45:51 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 367243A0024 for ; Thu, 23 Jun 2016 21:45:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1750026 - in /zookeeper/branches/branch-3.5: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/ Date: Thu, 23 Jun 2016 21:45:51 -0000 To: commits@zookeeper.apache.org From: cnauroth@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20160623214551.367243A0024@svn01-us-west.apache.org> archived-at: Thu, 23 Jun 2016 21:45:58 -0000 Author: cnauroth Date: Thu Jun 23 21:45:50 2016 New Revision: 1750026 URL: http://svn.apache.org/viewvc?rev=1750026&view=rev Log: ZOOKEEPER-2366: Reconfiguration of client port causes a socket leak. (fpj via cnauroth) Modified: zookeeper/branches/branch-3.5/CHANGES.txt zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java Modified: zookeeper/branches/branch-3.5/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/CHANGES.txt?rev=1750026&r1=1750025&r2=1750026&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/CHANGES.txt (original) +++ zookeeper/branches/branch-3.5/CHANGES.txt Thu Jun 23 21:45:50 2016 @@ -165,6 +165,9 @@ BUGFIXES: ZOOKEEPER-2380: Deadlock between leader shutdown and forwarding ACK to the leader (Arshad Mohammad via cnauroth) + ZOOKEEPER-2366: Reconfiguration of client port causes a socket leak. + (fpj via cnauroth) + IMPROVEMENTS: ZOOKEEPER-2270: Allow MBeanRegistry to be overridden for better unit tests Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java?rev=1750026&r1=1750025&r2=1750026&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java (original) +++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java Thu Jun 23 21:45:50 2016 @@ -688,31 +688,42 @@ public class NIOServerCnxnFactory extend ss.configureBlocking(false); acceptThread = new AcceptThread(ss, addr, selectorThreads); } - + + private void tryClose(ServerSocketChannel s) { + try { + s.close(); + } catch (IOException sse) { + LOG.error("Error while closing server socket.", sse); + } + } + @Override - public void reconfigure(InetSocketAddress addr){ + public void reconfigure(InetSocketAddress addr) { ServerSocketChannel oldSS = ss; try { - this.ss = ServerSocketChannel.open(); - ss.socket().setReuseAddress(true); - LOG.info("binding to port " + addr); - ss.socket().bind(addr); - ss.configureBlocking(false); - acceptThread.setReconfiguring(); - oldSS.close(); - acceptThread.wakeupSelector(); - try { - acceptThread.join(); - } catch (InterruptedException e) { - LOG.error("Error joining old acceptThread when reconfiguring client port " + e.getMessage()); - } - acceptThread = new AcceptThread(ss, addr, selectorThreads); - acceptThread.start(); + this.ss = ServerSocketChannel.open(); + ss.socket().setReuseAddress(true); + LOG.info("binding to port " + addr); + ss.socket().bind(addr); + ss.configureBlocking(false); + acceptThread.setReconfiguring(); + tryClose(oldSS); + acceptThread.wakeupSelector(); + try { + acceptThread.join(); + } catch (InterruptedException e) { + LOG.error("Error joining old acceptThread when reconfiguring client port {}", + e.getMessage()); + Thread.currentThread().interrupt(); + } + acceptThread = new AcceptThread(ss, addr, selectorThreads); + acceptThread.start(); } catch(IOException e) { - LOG.error("Error reconfiguring client port to " + addr + " " + e.getMessage()); + LOG.error("Error reconfiguring client port to {} {}", addr, e.getMessage()); + tryClose(oldSS); } } - + /** {@inheritDoc} */ public int getMaxClientCnxnsPerHost() { return maxClientCnxns; Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java?rev=1750026&r1=1750025&r2=1750026&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java (original) +++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java Thu Jun 23 21:45:50 2016 @@ -488,13 +488,17 @@ public class NettyServerCnxnFactory exte parentChannel = bootstrap.bind(localAddress); } - public void reconfigure(InetSocketAddress addr) - { + public void reconfigure(InetSocketAddress addr) { Channel oldChannel = parentChannel; - LOG.info("binding to port " + addr); - parentChannel = bootstrap.bind(addr); - localAddress = addr; - oldChannel.close(); + try { + LOG.info("binding to port {}", addr); + parentChannel = bootstrap.bind(addr); + localAddress = addr; + } catch (Exception e) { + LOG.error("Error while reconfiguring", e); + } finally { + oldChannel.close(); + } } @Override Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1750026&r1=1750025&r2=1750026&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original) +++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Jun 23 21:45:50 2016 @@ -710,7 +710,7 @@ public class Leader { // that different operations wait for different sets of acks, and we still want to enforce // that they are committed in order. Currently we only permit one outstanding reconfiguration // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is - // pending all wait for a quorum of old and new config, so its not possible to get enough acks + // pending all wait for a quorum of old and new config, so it's not possible to get enough acks // for an operation without getting enough acks for preceding ops. But in the future if multiple // concurrent reconfigs are allowed, this can happen. if (outstandingProposals.containsKey(zxid - 1)) return false; @@ -751,7 +751,7 @@ public class Leader { QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier(); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); - + if (designatedLeader != self.getId()) { allowedToCommit = false; } @@ -1261,7 +1261,7 @@ public class Leader { QuorumVerifier newQV = self.getLastSeenQuorumVerifier(); Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid()); - + self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) { allowedToCommit = false; Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1750026&r1=1750025&r2=1750026&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original) +++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Jun 23 21:45:50 2016 @@ -1728,7 +1728,7 @@ public class QuorumPeer extends ZooKeepe writeLongToFile(ACCEPTED_EPOCH_FILENAME, e); } - public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE){ + public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) { InetSocketAddress oldClientAddr = getClientAddress(); // update last committed quorum verifier, write the new config to disk @@ -1756,8 +1756,8 @@ public class QuorumPeer extends ZooKeepe cnxnFactory.reconfigure(myNewQS.clientAddr); updateThreadName(); } - - boolean roleChange = updateLearnerType(qv); + + boolean roleChange = updateLearnerType(qv); boolean leaderChange = false; if (suggestedLeaderId != null) { // zxid should be non-null too @@ -1875,7 +1875,9 @@ public class QuorumPeer extends ZooKeepe } private void updateThreadName() { - String plain = cnxnFactory != null ? cnxnFactory.getLocalAddress().toString() : "disabled"; + String plain = cnxnFactory != null ? + cnxnFactory.getLocalAddress() != null ? + cnxnFactory.getLocalAddress().toString() : "disabled" : "disabled"; String secure = secureCnxnFactory != null ? secureCnxnFactory.getLocalAddress().toString() : "disabled"; setName(String.format("QuorumPeer[myid=%d](plain=%s)(secure=%s)", getId(), plain, secure)); } Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java?rev=1750026&r1=1750025&r2=1750026&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java (original) +++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java Thu Jun 23 21:45:50 2016 @@ -30,6 +30,7 @@ import org.junit.runners.Suite; ClientTest.class, FourLetterWordsTest.class, NullDataTest.class, + ReconfigTest.class, SessionTest.class, WatcherTest.class }) Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java?rev=1750026&r1=1750025&r2=1750026&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java (original) +++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java Thu Jun 23 21:45:50 2016 @@ -30,6 +30,7 @@ import org.junit.runners.Suite; ClientTest.class, FourLetterWordsTest.class, NullDataTest.class, + ReconfigTest.class, SessionTest.class, WatcherTest.class }) Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java?rev=1750026&r1=1750025&r2=1750026&view=diff ============================================================================== --- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java (original) +++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java Thu Jun 23 21:45:50 2016 @@ -18,13 +18,18 @@ package org.apache.zookeeper.test; +import static java.net.InetAddress.getLoopbackAddress; + import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.PortAssignment; @@ -33,18 +38,16 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.jmx.CommonNames; import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.QuorumStats; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; -import org.junit.Assert; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -650,6 +653,89 @@ public class ReconfigTest extends ZKTest closeAllHandles(zkArr); } + @Test + public void testPortChangeToBlockedPortFollower() throws Exception { + testPortChangeToBlockedPort(false); + } + @Test + public void testPortChangeToBlockedPortLeader() throws Exception { + testPortChangeToBlockedPort(true); + } + + private void testPortChangeToBlockedPort(boolean testLeader) throws Exception { + qu = new QuorumUtil(1); // create 3 servers + qu.disableJMXTest = true; + qu.startAll(); + ZooKeeper[] zkArr = createHandles(qu); + + List joiningServers = new ArrayList(); + + int leaderIndex = getLeaderId(qu); + int followerIndex = leaderIndex == 1 ? 2 : 1; + int serverIndex = testLeader ? leaderIndex : followerIndex; + int reconfigIndex = testLeader ? followerIndex : leaderIndex; + + // modify server's client port + int quorumPort = qu.getPeer(serverIndex).peer.getQuorumAddress().getPort(); + int electionPort = qu.getPeer(serverIndex).peer.getElectionAddress().getPort(); + int oldClientPort = qu.getPeer(serverIndex).peer.getClientPort(); + int newClientPort = PortAssignment.unique(); + + try(ServerSocket ss = new ServerSocket()) { + ss.bind(new InetSocketAddress(getLoopbackAddress(), newClientPort)); + + joiningServers.add("server." + serverIndex + "=localhost:" + quorumPort + + ":" + electionPort + ":participant;localhost:" + newClientPort); + + // create a /test znode and check that read/write works before + // any reconfig is invoked + testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]); + + // Reconfigure + reconfig(zkArr[reconfigIndex], joiningServers, null, null, -1); + Thread.sleep(1000); + + // The follower reconfiguration will have failed + zkArr[serverIndex].close(); + zkArr[serverIndex] = new ZooKeeper("127.0.0.1:" + + newClientPort, + ClientBase.CONNECTION_TIMEOUT, new Watcher() { + public void process(WatchedEvent event) {}}); + + try { + Thread.sleep(1000); + zkArr[serverIndex].setData("/test", "teststr".getBytes(), -1); + Assert.fail("New client connected to new client port!"); + } catch (KeeperException.ConnectionLossException e) { + // Exception is expected + } + + //The old port should be clear at this stage + + try (ServerSocket ss2 = new ServerSocket()) { + ss2.bind(new InetSocketAddress(getLoopbackAddress(), oldClientPort)); + } + + // Move back to the old port + joiningServers.clear(); + joiningServers.add("server." + serverIndex + "=localhost:" + quorumPort + + ":" + electionPort + ":participant;localhost:" + oldClientPort); + + reconfig(zkArr[reconfigIndex], joiningServers, null, null, -1); + + zkArr[serverIndex].close(); + zkArr[serverIndex] = new ZooKeeper("127.0.0.1:" + + oldClientPort, + ClientBase.CONNECTION_TIMEOUT, new Watcher() { + public void process(WatchedEvent event) {}}); + + testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]); + testServerHasConfig(zkArr[serverIndex], joiningServers, null); + Assert.assertEquals(oldClientPort, qu.getPeer(serverIndex).peer.getClientPort()); + } + closeAllHandles(zkArr); + } + @Test public void testUnspecifiedClientAddress() throws Exception { int[] ports = new int[3];