zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject zookeeper git commit: ZOOKEEPER-2778: QuorumPeer: encapsulate addresses
Date Fri, 07 Dec 2018 12:11:45 GMT
Repository: zookeeper
Updated Branches:
  refs/heads/branch-3.5 c6bd49c4c -> 914c971a7


ZOOKEEPER-2778: QuorumPeer: encapsulate addresses

[ZOOKEEPER-2778] QuorumPeer: encapsulate quorum/election/client addresses in an AddressTuple
held through an AtomicReference

Author: Michael Edwards <Michael Edwards>
Author: Michael Edwards <medwards@bitpusher.com>

Reviewers: andor@apache.org

Closes #707 from mkedwards/branch-3.5 and squashes the following commits:

78df674c4 [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: halt old QCM when clobbering existing
election algorithm
5038179e2 [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: warn when clobbering existing election
algorithm
bbeeebf87 [Michael Edwards] [ZOOKEEPER-2778] LeaderBeanTest: set up mock QuorumVerifier so
that addresses get set
9701f0576 [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: fix access to newly private data
members from ReconfigTest
0531d9c8e [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: fixes from code review
03d259bae [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: add fast path for already-non-null
quorum/election address
4cd10c865 [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer/QuorumCnxManager: address deadlock
and visibility issues
3694a4e31 [Michael Edwards] [ZOOKEEPER-2778] QuorumPeer: encapsulate quorum/election/client
addresses in an AddressTuple held through an AtomicReference


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/914c971a
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/914c971a
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/914c971a

Branch: refs/heads/branch-3.5
Commit: 914c971a749fae624fd7610f94d761ca200440e8
Parents: c6bd49c
Author: Michael Edwards <Michael Edwards>
Authored: Fri Dec 7 13:11:39 2018 +0100
Committer: Andor Molnar <andor@apache.org>
Committed: Fri Dec 7 13:11:39 2018 +0100

----------------------------------------------------------------------
 .../server/quorum/QuorumCnxManager.java         |   3 +-
 .../zookeeper/server/quorum/QuorumPeer.java     | 177 +++++++++++--------
 .../zookeeper/server/quorum/LeaderBeanTest.java |  22 +++
 .../org/apache/zookeeper/test/ReconfigTest.java |   4 +-
 4 files changed, 133 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/914c971a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 209cbcd..519b019 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -403,7 +403,8 @@ public class QuorumCnxManager {
             // represents protocol version (in other words - message type)
             dout.writeLong(PROTOCOL_VERSION);
             dout.writeLong(self.getId());
-            String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
+            final InetSocketAddress electionAddr = self.getElectionAddress();
+            String addr = electionAddr.getHostString() + ":" + electionAddr.getPort();
             byte[] addr_bytes = addr.getBytes();
             dout.writeInt(addr_bytes.length);
             dout.write(addr_bytes);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/914c971a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index a2253a2..260ccd9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -42,6 +42,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.sasl.SaslException;
 
@@ -109,7 +110,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     LocalPeerBean jmxLocalPeerBean;
     private Map<Long, RemotePeerBean> jmxRemotePeerBean;
     LeaderElectionBean jmxLeaderElectionBean;
-    private QuorumCnxManager qcm;
+
+    // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility
+    // of updates; see the implementation comment at setLastSeenQuorumVerifier().
+    private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
+
     QuorumAuthServer authServer;
     QuorumAuthLearner authLearner;
 
@@ -122,6 +127,18 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      */
     private ZKDatabase zkDb;
 
+    public static final class AddressTuple {
+        public final InetSocketAddress quorumAddr;
+        public final InetSocketAddress electionAddr;
+        public final InetSocketAddress clientAddr;
+
+        public AddressTuple(InetSocketAddress quorumAddr, InetSocketAddress electionAddr,
InetSocketAddress clientAddr) {
+            this.quorumAddr = quorumAddr;
+            this.electionAddr = electionAddr;
+            this.clientAddr = clientAddr;
+        }
+    }
+
     public static class QuorumServer {
         public InetSocketAddress addr = null;
 
@@ -456,10 +473,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      */
 
     //last committed quorum verifier
-    public QuorumVerifier quorumVerifier;
+    private QuorumVerifier quorumVerifier;
     
     //last proposed quorum verifier
-    public QuorumVerifier lastSeenQuorumVerifier = null;
+    private QuorumVerifier lastSeenQuorumVerifier = null;
 
     // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier.
     final Object QV_LOCK = new Object();
@@ -730,16 +747,14 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
 
     DatagramSocket udpSocket;
 
-    private InetSocketAddress myQuorumAddr;
-    private InetSocketAddress myElectionAddr = null;
-    private InetSocketAddress myClientAddr = null;
+    private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>();
 
     /**
      * Resolves hostname for a given server ID.
      *
      * This method resolves hostname for a given server ID in both quorumVerifer
      * and lastSeenQuorumVerifier. If the server ID matches the local server ID,
-     * it also updates myQuorumAddr and myElectionAddr.
+     * it also updates myAddrs.
      */
     public void recreateSocketAddresses(long id) {
         QuorumVerifier qv = getQuorumVerifier();
@@ -748,8 +763,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             if (qs != null) {
                 qs.recreateSocketAddresses();
                 if (id == getId()) {
-                    setQuorumAddress(qs.addr);
-                    setElectionAddress(qs.electionAddr);
+                    setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
                 }
             }
         }
@@ -762,39 +776,43 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         }
     }
 
-    public InetSocketAddress getQuorumAddress(){
-        synchronized (QV_LOCK) {
-            return myQuorumAddr;
+    private AddressTuple getAddrs(){
+        AddressTuple addrs = myAddrs.get();
+        if (addrs != null) {
+            return addrs;
         }
-    }
-    
-    public void setQuorumAddress(InetSocketAddress addr){
-        synchronized (QV_LOCK) {
-            myQuorumAddr = addr;
+        try {
+            synchronized (QV_LOCK) {
+                addrs = myAddrs.get();
+                while (addrs == null) {
+                    QV_LOCK.wait();
+                    addrs = myAddrs.get();
+                }
+                return addrs;
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
         }
     }
 
+    public InetSocketAddress getQuorumAddress(){
+        return getAddrs().quorumAddr;
+    }
+    
     public InetSocketAddress getElectionAddress(){
-        synchronized (QV_LOCK) {
-            return myElectionAddr;
-        }
+        return getAddrs().electionAddr;
     }
 
-    public void setElectionAddress(InetSocketAddress addr){
-        synchronized (QV_LOCK) {
-            myElectionAddr = addr;
-        }
-    }
-    
     public InetSocketAddress getClientAddress(){
-        synchronized (QV_LOCK) {
-            return myClientAddr;
-        }
+        final AddressTuple addrs = myAddrs.get();
+        return (addrs == null) ? null : addrs.clientAddr;
     }
     
-    public void setClientAddress(InetSocketAddress addr){
+    private void setAddrs(InetSocketAddress quorumAddr, InetSocketAddress electionAddr, InetSocketAddress
clientAddr){
         synchronized (QV_LOCK) {
-            myClientAddr = addr;
+            myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr));
+            QV_LOCK.notifyAll();
         }
     }
     
@@ -961,7 +979,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         //}
         if (electionType == 0) {
             try {
-                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
+                udpSocket = new DatagramSocket(getQuorumAddress().getPort());
                 responder = new ResponderThread();
                 responder.start();
             } catch (SocketException e) {
@@ -1077,7 +1095,12 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             le = new AuthFastLeaderElection(this, true);
             break;
         case 3:
-            qcm = createCnxnManager();
+            QuorumCnxManager qcm = createCnxnManager();
+            QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
+            if (oldQcm != null) {
+                LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
+                oldQcm.halt();
+            }
             QuorumCnxManager.Listener listener = qcm.listener;
             if(listener != null){
                 listener.start();
@@ -1544,18 +1567,6 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         }
     }
     
-    private void connectNewPeers(){
-        synchronized (QV_LOCK) {
-            if (qcm != null && quorumVerifier != null && lastSeenQuorumVerifier
!= null) {
-                Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
-                for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet())
{
-                    if (e.getKey() != getId() && !committedView.containsKey(e.getKey()))
-                        qcm.connectOne(e.getKey());
-                }
-            }
-        }
-    }
-
     public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW){
         if (qvOLD == null || !qvOLD.equals(qvNEW)) {
             LOG.warn("Restarting Leader Election");
@@ -1573,33 +1584,61 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix;
     }
     
+    // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK
+    // must be held.  We don't want quorumVerifier/lastSeenQuorumVerifier to change out from
+    // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will
take
+    // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken
+    // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne().
+    private void connectNewPeers(QuorumCnxManager qcm){
+        if (quorumVerifier != null && lastSeenQuorumVerifier != null) {
+            Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
+            for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet())
{
+                if (e.getKey() != getId() && !committedView.containsKey(e.getKey()))
+                    qcm.connectOne(e.getKey());
+            }
+        }
+    }
+
     public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
-        synchronized (QV_LOCK) {
-            if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion()
> qv.getVersion()) {
-                LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion()
+
-                        ". Current version: " + quorumVerifier.getVersion());
+        // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on
qcm
+        // and then take QV_LOCK.  Take the locks in the same order to ensure that we don't
+        // deadlock against other callers of connectOne().  If qcmRef gets set in another
+        // thread while we're inside the synchronized block, that does no harm; if we didn't
+        // take a lock on qcm (because it was null when we sampled it), we won't call
+        // connectOne() on it.  (Use of an AtomicReference is enough to guarantee visibility
+        // of updates that provably happen in another thread before entering this method.)
+        QuorumCnxManager qcm = qcmRef.get();
+        Object outerLockObject = (qcm != null) ? qcm : QV_LOCK;
+        synchronized (outerLockObject) {
+            synchronized (QV_LOCK) {
+                if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion()
> qv.getVersion()) {
+                    LOG.error("setLastSeenQuorumVerifier called with stale config " + qv.getVersion()
+
+                            ". Current version: " + quorumVerifier.getVersion());
+                }
+                // assuming that a version uniquely identifies a configuration, so if
+                // version is the same, nothing to do here.
+                if (lastSeenQuorumVerifier != null &&
+                        lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
+                    return;
+                }
+                lastSeenQuorumVerifier = qv;
+                if (qcm != null) {
+                    connectNewPeers(qcm);
+                }
 
-            }
-            // assuming that a version uniquely identifies a configuration, so if
-            // version is the same, nothing to do here.
-            if (lastSeenQuorumVerifier != null &&
-                    lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
-                return;
-            }
-            lastSeenQuorumVerifier = qv;
-            connectNewPeers();
-            if (writeToDisk) {
-                try {
-                    String fileName = getNextDynamicConfigFilename();
-                    if (fileName != null) {
-                        QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
+                if (writeToDisk) {
+                    try {
+                        String fileName = getNextDynamicConfigFilename();
+                        if (fileName != null) {
+                            QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
+                        }
+                    } catch (IOException e) {
+                        LOG.error("Error writing next dynamic config file to disk: ", e.getMessage());
                     }
-                } catch (IOException e) {
-                    LOG.error("Error writing next dynamic config file to disk: ", e.getMessage());
                 }
             }
         }
-     }       
+    }
     
     public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
         synchronized (QV_LOCK) {
@@ -1639,9 +1678,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             }
             QuorumServer qs = qv.getAllMembers().get(getId());
             if (qs != null) {
-                setQuorumAddress(qs.addr);
-                setElectionAddress(qs.electionAddr);
-                setClientAddress(qs.clientAddr);
+                setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
             }
             return prevQV;
         }
@@ -1820,7 +1857,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
      * get reference to QuorumCnxManager
      */
     public QuorumCnxManager getQuorumCnxManager() {
-        return qcm;
+        return qcmRef.get();
     }
     private long readLongFromFile(String name) throws IOException {
         File file = new File(logFactory.getSnapDir(), name);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/914c971a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
index 31cef79..99d5b5d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderBeanTest.java
@@ -21,10 +21,13 @@ package org.apache.zookeeper.server.quorum;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.txn.TxnHeader;
@@ -36,6 +39,10 @@ import org.mockito.stubbing.Answer;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -43,6 +50,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class LeaderBeanTest {
     private Leader leader;
@@ -54,8 +62,22 @@ public class LeaderBeanTest {
     @Before
     public void setUp() throws IOException, X509Exception {
         qp = new QuorumPeer();
+        long myId = qp.getId();
+
+        int clientPort = PortAssignment.unique();
+        Map<Long, QuorumServer> peersView = new HashMap<Long, QuorumServer>();
+        InetAddress clientIP = InetAddress.getLoopbackAddress();
+
+        peersView.put(Long.valueOf(myId),
+                new QuorumServer(myId, new InetSocketAddress(clientIP, PortAssignment.unique()),
+                        new InetSocketAddress(clientIP, PortAssignment.unique()),
+                        new InetSocketAddress(clientIP, clientPort), LearnerType.PARTICIPANT));
+
         QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
+        when(quorumVerifierMock.getAllMembers()).thenReturn(peersView);
+
         qp.setQuorumVerifier(quorumVerifierMock, false);
+
         File tmpDir = ClientBase.createTmpDir();
         fileTxnSnapLog = new FileTxnSnapLog(new File(tmpDir, "data"),
                 new File(tmpDir, "data_txnlog"));

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/914c971a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
index a050f7a..fb0e5f0 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
@@ -844,7 +844,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         testNormalOperation(zkArr[4], zkArr[5]);
 
         for (int i = 1; i <= 5; i++) {
-            if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumHierarchical))
+            if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumHierarchical))
                 Assert.fail("peer " + i
                         + " doesn't think the quorum system is Hieararchical!");
         }
@@ -881,7 +881,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
         testNormalOperation(zkArr[1], zkArr[2]);
 
         for (int i = 1; i <= 2; i++) {
-            if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumMaj))
+            if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumMaj))
                 Assert.fail("peer "
                         + i
                         + " doesn't think the quorum system is a majority quorum system!");


Mime
View raw message