Repository: zookeeper
Updated Branches:
refs/heads/master b7403b790 -> 6ea3c0b68
ZOOKEEPER-2778: QuorumPeer: address potential reconfiguration deadlocks
* QuorumPeer: encapsulate quorum/election/client addresses in an AddressTuple held through
an AtomicReference
* QuorumPeer/QuorumCnxManager: address deadlock and visibility issues
* QuorumPeer: add fast path for already-non-null quorum/election address
* QuorumPeer: fix access to newly private data members from ReconfigTest
* LeaderBeanTest: set up mock QuorumVerifier so that addresses get set
* QuorumPeer: warn when clobbering existing election algorithm
* QuorumPeer: halt old QCM when clobbering existing election algorithm
Author: Michael Edwards <medwards@bitpusher.com>
Reviewers: andor@apache.org
Closes #719 from mkedwards/ZOOKEEPER-2778-for-master
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/6ea3c0b6
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/6ea3c0b6
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/6ea3c0b6
Branch: refs/heads/master
Commit: 6ea3c0b6897ec3a833f5b86fe8612bc9b2ac672c
Parents: b7403b7
Author: Michael Edwards <medwards@bitpusher.com>
Authored: Fri Dec 7 12:41:50 2018 +0100
Committer: Andor Molnar <andor@apache.org>
Committed: Fri Dec 7 12:41:50 2018 +0100
----------------------------------------------------------------------
.../zookeeper/server/quorum/QuorumPeer.java | 179 +++++++++++--------
.../zookeeper/server/quorum/LeaderBeanTest.java | 19 ++
.../org/apache/zookeeper/test/ReconfigTest.java | 4 +-
3 files changed, 129 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/6ea3c0b6/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index b33404e..035db0c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -41,6 +41,7 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.sasl.SaslException;
@@ -114,7 +115,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
LocalPeerBean jmxLocalPeerBean;
private Map<Long, RemotePeerBean> jmxRemotePeerBean;
LeaderElectionBean jmxLeaderElectionBean;
- private QuorumCnxManager qcm;
+
+ // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility
+ // of updates; see the implementation comment at setLastSeenQuorumVerifier().
+ private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
+
QuorumAuthServer authServer;
QuorumAuthLearner authLearner;
@@ -127,6 +132,18 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
*/
private ZKDatabase zkDb;
+ public static final class AddressTuple {
+ public final InetSocketAddress quorumAddr;
+ public final InetSocketAddress electionAddr;
+ public final InetSocketAddress clientAddr;
+
+ public AddressTuple(InetSocketAddress quorumAddr, InetSocketAddress electionAddr,
InetSocketAddress clientAddr) {
+ this.quorumAddr = quorumAddr;
+ this.electionAddr = electionAddr;
+ this.clientAddr = clientAddr;
+ }
+ }
+
public static class QuorumServer {
public InetSocketAddress addr = null;
@@ -442,10 +459,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
*/
//last committed quorum verifier
- public QuorumVerifier quorumVerifier;
+ private QuorumVerifier quorumVerifier;
//last proposed quorum verifier
- public QuorumVerifier lastSeenQuorumVerifier = null;
+ private QuorumVerifier lastSeenQuorumVerifier = null;
// Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier.
final Object QV_LOCK = new Object();
@@ -716,16 +733,14 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
DatagramSocket udpSocket;
- private InetSocketAddress myQuorumAddr;
- private InetSocketAddress myElectionAddr = null;
- private InetSocketAddress myClientAddr = null;
+ private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>();
/**
* Resolves hostname for a given server ID.
*
* This method resolves hostname for a given server ID in both quorumVerifer
* and lastSeenQuorumVerifier. If the server ID matches the local server ID,
- * it also updates myQuorumAddr and myElectionAddr.
+ * it also updates myAddrs.
*/
public void recreateSocketAddresses(long id) {
QuorumVerifier qv = getQuorumVerifier();
@@ -734,8 +749,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
if (qs != null) {
qs.recreateSocketAddresses();
if (id == getId()) {
- setQuorumAddress(qs.addr);
- setElectionAddress(qs.electionAddr);
+ setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
}
}
}
@@ -748,42 +762,46 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
}
}
- public InetSocketAddress getQuorumAddress(){
- synchronized (QV_LOCK) {
- return myQuorumAddr;
+ private AddressTuple getAddrs(){
+ AddressTuple addrs = myAddrs.get();
+ if (addrs != null) {
+ return addrs;
}
- }
-
- public void setQuorumAddress(InetSocketAddress addr){
- synchronized (QV_LOCK) {
- myQuorumAddr = addr;
+ try {
+ synchronized (QV_LOCK) {
+ addrs = myAddrs.get();
+ while (addrs == null) {
+ QV_LOCK.wait();
+ addrs = myAddrs.get();
+ }
+ return addrs;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
}
}
- public InetSocketAddress getElectionAddress(){
- synchronized (QV_LOCK) {
- return myElectionAddr;
- }
+ public InetSocketAddress getQuorumAddress(){
+ return getAddrs().quorumAddr;
}
- public void setElectionAddress(InetSocketAddress addr){
- synchronized (QV_LOCK) {
- myElectionAddr = addr;
- }
+ public InetSocketAddress getElectionAddress(){
+ return getAddrs().electionAddr;
}
-
+
public InetSocketAddress getClientAddress(){
- synchronized (QV_LOCK) {
- return myClientAddr;
- }
+ final AddressTuple addrs = myAddrs.get();
+ return (addrs == null) ? null : addrs.clientAddr;
}
-
- public void setClientAddress(InetSocketAddress addr){
+
+ private void setAddrs(InetSocketAddress quorumAddr, InetSocketAddress electionAddr, InetSocketAddress
clientAddr){
synchronized (QV_LOCK) {
- myClientAddr = addr;
+ myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr));
+ QV_LOCK.notifyAll();
}
}
-
+
private int electionType;
Election electionAlg;
@@ -1050,7 +1068,12 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
le = new AuthFastLeaderElection(this, true);
break;
case 3:
- qcm = createCnxnManager();
+ QuorumCnxManager qcm = createCnxnManager();
+ QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
+ if (oldQcm != null) {
+ LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
+ oldQcm.halt();
+ }
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
@@ -1515,18 +1538,6 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
}
}
- private void connectNewPeers(){
- synchronized (QV_LOCK) {
- if (qcm != null && quorumVerifier != null && lastSeenQuorumVerifier
!= null) {
- Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
- for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet())
{
- if (e.getKey() != getId() && !committedView.containsKey(e.getKey()))
- qcm.connectOne(e.getKey());
- }
- }
- }
- }
-
public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW){
if (qvOLD == null || !qvOLD.equals(qvNEW)) {
LOG.warn("Restarting Leader Election");
@@ -1544,33 +1555,61 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix;
}
+ // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK
+ // must be held. We don't want quorumVerifier/lastSeenQuorumVerifier to change out from
+ // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will
take
+ // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken
+ // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne().
+ private void connectNewPeers(QuorumCnxManager qcm){
+ if (quorumVerifier != null && lastSeenQuorumVerifier != null) {
+ Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
+ for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet())
{
+ if (e.getKey() != getId() && !committedView.containsKey(e.getKey()))
+ qcm.connectOne(e.getKey());
+ }
+ }
+ }
+
public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
- synchronized (QV_LOCK) {
- if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion()
> qv.getVersion()) {
- LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion()
+
- ". Current version: " + quorumVerifier.getVersion());
+ // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on
qcm
+ // and then take QV_LOCK. Take the locks in the same order to ensure that we don't
+ // deadlock against other callers of connectOne(). If qcmRef gets set in another
+ // thread while we're inside the synchronized block, that does no harm; if we didn't
+ // take a lock on qcm (because it was null when we sampled it), we won't call
+ // connectOne() on it. (Use of an AtomicReference is enough to guarantee visibility
+ // of updates that provably happen in another thread before entering this method.)
+ QuorumCnxManager qcm = qcmRef.get();
+ Object outerLockObject = (qcm != null) ? qcm : QV_LOCK;
+ synchronized (outerLockObject) {
+ synchronized (QV_LOCK) {
+ if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion()
> qv.getVersion()) {
+ LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion()
+
+ ". Current version: " + quorumVerifier.getVersion());
+ }
+ // assuming that a version uniquely identifies a configuration, so if
+ // version is the same, nothing to do here.
+ if (lastSeenQuorumVerifier != null &&
+ lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
+ return;
+ }
+ lastSeenQuorumVerifier = qv;
+ if (qcm != null) {
+ connectNewPeers(qcm);
+ }
- }
- // assuming that a version uniquely identifies a configuration, so if
- // version is the same, nothing to do here.
- if (lastSeenQuorumVerifier != null &&
- lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
- return;
- }
- lastSeenQuorumVerifier = qv;
- connectNewPeers();
- if (writeToDisk) {
- try {
- String fileName = getNextDynamicConfigFilename();
- if (fileName != null) {
- QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
+ if (writeToDisk) {
+ try {
+ String fileName = getNextDynamicConfigFilename();
+ if (fileName != null) {
+ QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
+ }
+ } catch (IOException e) {
+ LOG.error("Error writing next dynamic config file to disk: ", e.getMessage());
}
- } catch (IOException e) {
- LOG.error("Error writing next dynamic config file to disk: ", e.getMessage());
}
}
}
- }
+ }
public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
synchronized (QV_LOCK) {
@@ -1610,9 +1649,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
}
QuorumServer qs = qv.getAllMembers().get(getId());
if (qs != null) {
- setQuorumAddress(qs.addr);
- setElectionAddress(qs.electionAddr);
- setClientAddress(qs.clientAddr);
+ setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
}
return prevQV;
}
@@ -1795,7 +1832,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
* get reference to QuorumCnxManager
*/
public QuorumCnxManager getQuorumCnxManager() {
- return qcm;
+ return qcmRef.get();
}
private long readLongFromFile(String name) throws IOException {
File file = new File(logFactory.getSnapDir(), name);
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/6ea3c0b6/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
index 9087483..69dac1f 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
@@ -21,11 +21,13 @@ package org.apache.zookeeper.server.quorum;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.txn.TxnHeader;
@@ -37,6 +39,10 @@ import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -56,7 +62,20 @@ public class LeaderBeanTest {
@Before
public void setUp() throws IOException, X509Exception {
qp = new QuorumPeer();
+ long myId = qp.getId();
+
+ int clientPort = PortAssignment.unique();
+ Map<Long, QuorumServer> peersView = new HashMap<Long, QuorumServer>();
+ InetAddress clientIP = InetAddress.getLoopbackAddress();
+
+ peersView.put(Long.valueOf(myId),
+ new QuorumServer(myId, new InetSocketAddress(clientIP, PortAssignment.unique()),
+ new InetSocketAddress(clientIP, PortAssignment.unique()),
+ new InetSocketAddress(clientIP, clientPort), LearnerType.PARTICIPANT));
+
QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
+ when(quorumVerifierMock.getAllMembers()).thenReturn(peersView);
+
qp.setQuorumVerifier(quorumVerifierMock, false);
File tmpDir = ClientBase.createEmptyTestDir();
fileTxnSnapLog = new FileTxnSnapLog(new File(tmpDir, "data"),
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/6ea3c0b6/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
index 7b39ab1..891a7d2 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
@@ -839,7 +839,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
testNormalOperation(zkArr[4], zkArr[5]);
for (int i = 1; i <= 5; i++) {
- if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumHierarchical))
+ if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumHierarchical))
Assert.fail("peer " + i
+ " doesn't think the quorum system is Hieararchical!");
}
@@ -876,7 +876,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
testNormalOperation(zkArr[1], zkArr[2]);
for (int i = 1; i <= 2; i++) {
- if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumMaj))
+ if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumMaj))
Assert.fail("peer "
+ i
+ " doesn't think the quorum system is a majority quorum system!");
|