distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [03/29] incubator-distributedlog git commit: Improve handling of lock conflicts in zk session lock
Date Wed, 21 Dec 2016 08:00:11 GMT
Improve handling of lock conflicts in zk session lock

- lock reacquire could happen in foreground and background thread. so use a semaphore to make
sure there is only on e outstanding  acquire operation. and  check if already hold lock before
reacquire.
- fix handling zk sibling znode logic. as the znode is sequential znode, the name would be
different each time. so only comparing the client id and session id of the znodes

RB_ID=833945


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/7b46a9ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/7b46a9ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/7b46a9ac

Branch: refs/heads/merge/DL-98
Commit: 7b46a9ac6bb5d520366069c244332347ef019e8e
Parents: 0091960
Author: Sijie Guo <sijieg@twitter.com>
Authored: Mon May 23 16:49:19 2016 -0700
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Mon Dec 12 16:28:58 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/lock/ZKSessionLock.java      | 35 ++++++++++++--
 .../distributedlog/lock/TestZKSessionLock.java  | 49 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7b46a9ac/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
index 87894dc..dc57d55 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
@@ -480,6 +480,31 @@ class ZKSessionLock implements SessionLock {
         return id;
     }
 
+    static boolean areLockWaitersInSameSession(String node1, String node2) {
+        String[] parts1 = node1.split("_");
+        String[] parts2 = node2.split("_");
+        if (parts1.length != 4 || parts2.length != 4) {
+            return node1.equals(node2);
+        }
+        if (!parts1[2].startsWith("s") || !parts2[2].startsWith("s")) {
+            return node1.equals(node2);
+        }
+        long sessionOwner1 = Long.parseLong(parts1[2].substring(1));
+        long sessionOwner2 = Long.parseLong(parts2[2].substring(1));
+        if (sessionOwner1 != sessionOwner2) {
+            return false;
+        }
+        String clientId1, clientId2;
+        try {
+            clientId1 = URLDecoder.decode(parts1[1], UTF_8.name());
+            clientId2 = URLDecoder.decode(parts2[1], UTF_8.name());
+            return clientId1.equals(clientId2);
+        } catch (UnsupportedEncodingException e) {
+            // if failed to parse client id, we have to get client id by zookeeper#getData.
+            return node1.equals(node2);
+        }
+    }
+
     /**
      * Get client id and its ephemeral owner.
      *
@@ -1209,17 +1234,19 @@ class ZKSessionLock implements SessionLock {
             @Override
             public void execute() {
                 boolean shouldWatch;
+                final boolean shouldClaimOwnership;
                 if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode))
{
                     // if the current owner is the znode left from previous session
                     // we should watch it and claim ownership
                     shouldWatch = true;
+                    shouldClaimOwnership = true;
                     LOG.info("LockWatcher {} for {} found its previous session {} held lock,
watch it to claim ownership.",
                             new Object[] { myNode, lockPath, currentOwner });
-                } else if (lockId.compareTo(currentOwner) == 0 && siblingNode.equals(ownerNode))
{
+                } else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode,
ownerNode)) {
                     // I found that my sibling is the current owner with same lock id (client
id & session id)
                     // It must be left by any race condition from same zookeeper client
-                    // I would watch owner instead of sibling
                     shouldWatch = true;
+                    shouldClaimOwnership = true;
                     LOG.info("LockWatcher {} for {} found itself {} already held lock at
sibling node {}, watch it to claim ownership.",
                             new Object[]{myNode, lockPath, lockId, siblingNode});
                 } else {
@@ -1230,6 +1257,7 @@ class ZKSessionLock implements SessionLock {
                                     new Object[]{lockPath, myNode, siblingNode, System.currentTimeMillis()});
                         }
                     }
+                    shouldClaimOwnership = false;
                 }
 
                 // watch sibling for lock ownership
@@ -1247,8 +1275,7 @@ class ZKSessionLock implements SessionLock {
                                     }
 
                                     if (KeeperException.Code.OK.intValue() == rc) {
-                                        if (siblingNode.equals(ownerNode) &&
-                                                (lockId.compareTo(currentOwner) == 0 || lockContext.hasLockId(currentOwner)))
{
+                                        if (shouldClaimOwnership) {
                                             // watch owner successfully
                                             LOG.info("LockWatcher {} claimed ownership for
{} after set watcher on {}.",
                                                     new Object[]{ myNode, lockPath, ownerNode
});

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7b46a9ac/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
index 629538e..054d714 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
@@ -180,6 +180,28 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
     }
 
     @Test(timeout = 60000)
+    public void testAreLockWaitersInSameSession() throws Exception {
+        ZooKeeper zk = zkc.get();
+
+        String lockPath = "/test-are-lock-waiters-in-same-session";
+        String clientId1 = "test-are-lock-waiters-in-same-session-1";
+        String clientId2 = "test-are-lock-waiters-in-same-session-2";
+
+        createLockPath(zk, lockPath);
+
+        String node1 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));
+        String node2 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId2));
+        String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));
+
+        assertEquals(node1 + " and " + node3 + " should be in same session.",
+                true, areLockWaitersInSameSession(node1, node3));
+        assertEquals(node1 + " and " + node2 + " should be not in same session.",
+                false, areLockWaitersInSameSession(node1, node2));
+        assertEquals(node3 + " and " + node2 + " should be not in same session.",
+                false, areLockWaitersInSameSession(node3, node2));
+    }
+
+    @Test(timeout = 60000)
     public void testExecuteLockAction() throws Exception {
         String lockPath = "/test-execute-lock-action";
         String clientId = "test-execute-lock-action-" + System.currentTimeMillis();
@@ -921,6 +943,33 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         lock1_1.unlock();
     }
 
+    @Test(timeout = 60000)
+    public void testLockWithMultipleSiblingWaiters() throws Exception {
+        String lockPath = "/test-lock-with-multiple-sibling-waiters";
+        String clientId = "client-id";
+
+        createLockPath(zkc.get(), lockPath);
+
+        final ZKSessionLock lock0 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+        final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+        final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+
+        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+
+        List<String> children = awaitWaiters(3, zkc, lockPath);
+
+        assertEquals(3, children.size());
+        assertEquals(State.CLAIMED, lock0.getLockState());
+        assertEquals(State.CLAIMED, lock1.getLockState());
+        assertEquals(State.CLAIMED, lock2.getLockState());
+
+        lock0.unlock();
+        lock1.unlock();
+        lock2.unlock();
+    }
+
     /**
      * Immediate lock and unlock first lock
      * @throws Exception


Mime
View raw message