From commits-return-7449-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Fri Dec 7 12:41:58 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 046F1180647 for ; Fri, 7 Dec 2018 12:41:56 +0100 (CET) Received: (qmail 59036 invoked by uid 500); 7 Dec 2018 11:41:56 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 59024 invoked by uid 99); 7 Dec 2018 11:41:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Dec 2018 11:41:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0FC32E11F7; Fri, 7 Dec 2018 11:41:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andor@apache.org To: commits@zookeeper.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: zookeeper git commit: ZOOKEEPER-2778: QuorumPeer: address potential reconfiguration deadlocks Date: Fri, 7 Dec 2018 11:41:56 +0000 (UTC) Repository: zookeeper Updated Branches: refs/heads/master b7403b790 -> 6ea3c0b68 ZOOKEEPER-2778: QuorumPeer: address potential reconfiguration deadlocks * QuorumPeer: encapsulate quorum/election/client addresses in an AddressTuple held through an AtomicReference * QuorumPeer/QuorumCnxManager: address deadlock and visibility issues * QuorumPeer: add fast path for already-non-null quorum/election address * QuorumPeer: fix access to newly private data members from ReconfigTest * LeaderBeanTest: set up mock QuorumVerifier so that addresses get set * QuorumPeer: warn when clobbering existing election algorithm * QuorumPeer: halt old QCM when clobbering existing election algorithm Author: Michael Edwards Reviewers: andor@apache.org Closes #719 from mkedwards/ZOOKEEPER-2778-for-master Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/6ea3c0b6 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/6ea3c0b6 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/6ea3c0b6 Branch: refs/heads/master Commit: 6ea3c0b6897ec3a833f5b86fe8612bc9b2ac672c Parents: b7403b7 Author: Michael Edwards Authored: Fri Dec 7 12:41:50 2018 +0100 Committer: Andor Molnar Committed: Fri Dec 7 12:41:50 2018 +0100 ---------------------------------------------------------------------- .../zookeeper/server/quorum/QuorumPeer.java | 179 +++++++++++-------- .../zookeeper/server/quorum/LeaderBeanTest.java | 19 ++ .../org/apache/zookeeper/test/ReconfigTest.java | 4 +- 3 files changed, 129 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/6ea3c0b6/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index b33404e..035db0c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -41,6 +41,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import javax.security.sasl.SaslException; @@ -114,7 +115,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider LocalPeerBean jmxLocalPeerBean; private Map jmxRemotePeerBean; LeaderElectionBean jmxLeaderElectionBean; - private QuorumCnxManager qcm; + + // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility + // of updates; see the implementation comment at setLastSeenQuorumVerifier(). + private AtomicReference qcmRef = new AtomicReference<>(); + QuorumAuthServer authServer; QuorumAuthLearner authLearner; @@ -127,6 +132,18 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider */ private ZKDatabase zkDb; + public static final class AddressTuple { + public final InetSocketAddress quorumAddr; + public final InetSocketAddress electionAddr; + public final InetSocketAddress clientAddr; + + public AddressTuple(InetSocketAddress quorumAddr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) { + this.quorumAddr = quorumAddr; + this.electionAddr = electionAddr; + this.clientAddr = clientAddr; + } + } + public static class QuorumServer { public InetSocketAddress addr = null; @@ -442,10 +459,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider */ //last committed quorum verifier - public QuorumVerifier quorumVerifier; + private QuorumVerifier quorumVerifier; //last proposed quorum verifier - public QuorumVerifier lastSeenQuorumVerifier = null; + private QuorumVerifier lastSeenQuorumVerifier = null; // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier. final Object QV_LOCK = new Object(); @@ -716,16 +733,14 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider DatagramSocket udpSocket; - private InetSocketAddress myQuorumAddr; - private InetSocketAddress myElectionAddr = null; - private InetSocketAddress myClientAddr = null; + private final AtomicReference myAddrs = new AtomicReference<>(); /** * Resolves hostname for a given server ID. * * This method resolves hostname for a given server ID in both quorumVerifer * and lastSeenQuorumVerifier. If the server ID matches the local server ID, - * it also updates myQuorumAddr and myElectionAddr. + * it also updates myAddrs. */ public void recreateSocketAddresses(long id) { QuorumVerifier qv = getQuorumVerifier(); @@ -734,8 +749,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider if (qs != null) { qs.recreateSocketAddresses(); if (id == getId()) { - setQuorumAddress(qs.addr); - setElectionAddress(qs.electionAddr); + setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); } } } @@ -748,42 +762,46 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } } - public InetSocketAddress getQuorumAddress(){ - synchronized (QV_LOCK) { - return myQuorumAddr; + private AddressTuple getAddrs(){ + AddressTuple addrs = myAddrs.get(); + if (addrs != null) { + return addrs; } - } - - public void setQuorumAddress(InetSocketAddress addr){ - synchronized (QV_LOCK) { - myQuorumAddr = addr; + try { + synchronized (QV_LOCK) { + addrs = myAddrs.get(); + while (addrs == null) { + QV_LOCK.wait(); + addrs = myAddrs.get(); + } + return addrs; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } } - public InetSocketAddress getElectionAddress(){ - synchronized (QV_LOCK) { - return myElectionAddr; - } + public InetSocketAddress getQuorumAddress(){ + return getAddrs().quorumAddr; } - public void setElectionAddress(InetSocketAddress addr){ - synchronized (QV_LOCK) { - myElectionAddr = addr; - } + public InetSocketAddress getElectionAddress(){ + return getAddrs().electionAddr; } - + public InetSocketAddress getClientAddress(){ - synchronized (QV_LOCK) { - return myClientAddr; - } + final AddressTuple addrs = myAddrs.get(); + return (addrs == null) ? null : addrs.clientAddr; } - - public void setClientAddress(InetSocketAddress addr){ + + private void setAddrs(InetSocketAddress quorumAddr, InetSocketAddress electionAddr, InetSocketAddress clientAddr){ synchronized (QV_LOCK) { - myClientAddr = addr; + myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr)); + QV_LOCK.notifyAll(); } } - + private int electionType; Election electionAlg; @@ -1050,7 +1068,12 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider le = new AuthFastLeaderElection(this, true); break; case 3: - qcm = createCnxnManager(); + QuorumCnxManager qcm = createCnxnManager(); + QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); + if (oldQcm != null) { + LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)"); + oldQcm.halt(); + } QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); @@ -1515,18 +1538,6 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } } - private void connectNewPeers(){ - synchronized (QV_LOCK) { - if (qcm != null && quorumVerifier != null && lastSeenQuorumVerifier != null) { - Map committedView = quorumVerifier.getAllMembers(); - for (Entry e : lastSeenQuorumVerifier.getAllMembers().entrySet()) { - if (e.getKey() != getId() && !committedView.containsKey(e.getKey())) - qcm.connectOne(e.getKey()); - } - } - } - } - public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW){ if (qvOLD == null || !qvOLD.equals(qvNEW)) { LOG.warn("Restarting Leader Election"); @@ -1544,33 +1555,61 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix; } + // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK + // must be held. We don't want quorumVerifier/lastSeenQuorumVerifier to change out from + // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will take + // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken + // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne(). + private void connectNewPeers(QuorumCnxManager qcm){ + if (quorumVerifier != null && lastSeenQuorumVerifier != null) { + Map committedView = quorumVerifier.getAllMembers(); + for (Entry e : lastSeenQuorumVerifier.getAllMembers().entrySet()) { + if (e.getKey() != getId() && !committedView.containsKey(e.getKey())) + qcm.connectOne(e.getKey()); + } + } + } + public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){ - synchronized (QV_LOCK) { - if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) { - LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() + - ". Current version: " + quorumVerifier.getVersion()); + // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm + // and then take QV_LOCK. Take the locks in the same order to ensure that we don't + // deadlock against other callers of connectOne(). If qcmRef gets set in another + // thread while we're inside the synchronized block, that does no harm; if we didn't + // take a lock on qcm (because it was null when we sampled it), we won't call + // connectOne() on it. (Use of an AtomicReference is enough to guarantee visibility + // of updates that provably happen in another thread before entering this method.) + QuorumCnxManager qcm = qcmRef.get(); + Object outerLockObject = (qcm != null) ? qcm : QV_LOCK; + synchronized (outerLockObject) { + synchronized (QV_LOCK) { + if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) { + LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() + + ". Current version: " + quorumVerifier.getVersion()); + } + // assuming that a version uniquely identifies a configuration, so if + // version is the same, nothing to do here. + if (lastSeenQuorumVerifier != null && + lastSeenQuorumVerifier.getVersion() == qv.getVersion()) { + return; + } + lastSeenQuorumVerifier = qv; + if (qcm != null) { + connectNewPeers(qcm); + } - } - // assuming that a version uniquely identifies a configuration, so if - // version is the same, nothing to do here. - if (lastSeenQuorumVerifier != null && - lastSeenQuorumVerifier.getVersion() == qv.getVersion()) { - return; - } - lastSeenQuorumVerifier = qv; - connectNewPeers(); - if (writeToDisk) { - try { - String fileName = getNextDynamicConfigFilename(); - if (fileName != null) { - QuorumPeerConfig.writeDynamicConfig(fileName, qv, true); + if (writeToDisk) { + try { + String fileName = getNextDynamicConfigFilename(); + if (fileName != null) { + QuorumPeerConfig.writeDynamicConfig(fileName, qv, true); + } + } catch (IOException e) { + LOG.error("Error writing next dynamic config file to disk: ", e.getMessage()); } - } catch (IOException e) { - LOG.error("Error writing next dynamic config file to disk: ", e.getMessage()); } } } - } + } public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){ synchronized (QV_LOCK) { @@ -1610,9 +1649,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider } QuorumServer qs = qv.getAllMembers().get(getId()); if (qs != null) { - setQuorumAddress(qs.addr); - setElectionAddress(qs.electionAddr); - setClientAddress(qs.clientAddr); + setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); } return prevQV; } @@ -1795,7 +1832,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider * get reference to QuorumCnxManager */ public QuorumCnxManager getQuorumCnxManager() { - return qcm; + return qcmRef.get(); } private long readLongFromFile(String name) throws IOException { File file = new File(logFactory.getSnapDir(), name); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/6ea3c0b6/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java index 9087483..69dac1f 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java @@ -21,11 +21,13 @@ package org.apache.zookeeper.server.quorum; import org.apache.jute.OutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.txn.TxnHeader; @@ -37,6 +39,10 @@ import org.mockito.stubbing.Answer; import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -56,7 +62,20 @@ public class LeaderBeanTest { @Before public void setUp() throws IOException, X509Exception { qp = new QuorumPeer(); + long myId = qp.getId(); + + int clientPort = PortAssignment.unique(); + Map peersView = new HashMap(); + InetAddress clientIP = InetAddress.getLoopbackAddress(); + + peersView.put(Long.valueOf(myId), + new QuorumServer(myId, new InetSocketAddress(clientIP, PortAssignment.unique()), + new InetSocketAddress(clientIP, PortAssignment.unique()), + new InetSocketAddress(clientIP, clientPort), LearnerType.PARTICIPANT)); + QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class); + when(quorumVerifierMock.getAllMembers()).thenReturn(peersView); + qp.setQuorumVerifier(quorumVerifierMock, false); File tmpDir = ClientBase.createEmptyTestDir(); fileTxnSnapLog = new FileTxnSnapLog(new File(tmpDir, "data"), http://git-wip-us.apache.org/repos/asf/zookeeper/blob/6ea3c0b6/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java index 7b39ab1..891a7d2 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java @@ -839,7 +839,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ testNormalOperation(zkArr[4], zkArr[5]); for (int i = 1; i <= 5; i++) { - if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumHierarchical)) + if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumHierarchical)) Assert.fail("peer " + i + " doesn't think the quorum system is Hieararchical!"); } @@ -876,7 +876,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{ testNormalOperation(zkArr[1], zkArr[2]); for (int i = 1; i <= 2; i++) { - if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumMaj)) + if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumMaj)) Assert.fail("peer " + i + " doesn't think the quorum system is a majority quorum system!");