curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [07/10] curator git commit: Moved findAndDeleteProtectedNodeInBackground code into separate operation that is processed through the standard Curator background code. This way, retries are applied (with sleep), etc. In the previous implementation, errors
Date Mon, 11 Jan 2016 00:30:56 GMT
Moved findAndDeleteProtectedNodeInBackground code into separate operation that is processed
through the standard Curator background
code. This way, retries are applied (with sleep), etc. In the previous implementation, errors
caused the background check to be run immediately and infinitely.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9b68e19a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9b68e19a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9b68e19a

Branch: refs/heads/CURATOR-3.0
Commit: 9b68e19a278e025fa5884445a2b2519463b57445
Parents: 31c0465
Author: randgalt <randgalt@apache.org>
Authored: Mon Dec 28 10:08:51 2015 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Mon Dec 28 10:08:51 2015 -0500

----------------------------------------------------------------------
 .../framework/imps/CreateBuilderImpl.java       |  68 +-----------
 .../FindAndDeleteProtectedNodeInBackground.java | 107 +++++++++++++++++++
 .../framework/imps/TestFrameworkEdges.java      |   1 +
 .../recipes/queue/DistributedQueue.java         |   5 +-
 4 files changed, 116 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9b68e19a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index b72b7b6..a9cb600 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -24,7 +24,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.TimeTrace;
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.*;
 import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
 import org.apache.curator.framework.api.transaction.OperationType;
@@ -464,13 +463,13 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
         }
         else
         {
-            String path = protectedPathInForeground(givenPath, adjustedPath, data);
+            String path = protectedPathInForeground(adjustedPath, data);
             returnPath = client.unfixForNamespace(path);
         }
         return returnPath;
     }
 
-    private String protectedPathInForeground(String givenPath, String adjustedPath, byte[]
data) throws Exception
+    private String protectedPathInForeground(String adjustedPath, byte[] data) throws Exception
     {
         try
         {
@@ -485,8 +484,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
                  * CURATOR-45 + CURATOR-79: we don't know if the create operation was successful
or not,
                  * register the znode to be sure it is deleted later.
                  */
-                String localProtectedId = protectedId;
-                findAndDeleteProtectedNodeInBackground(givenPath, localProtectedId, null);
+                new FindAndDeleteProtectedNodeInBackground(client, ZKPaths.getPathAndNode(adjustedPath).getPath(),
protectedId).execute();
                 /*
                 * The current UUID is scheduled to be deleted, it is not safe to use it again.
                 * If this builder is used again later create a new UUID
@@ -635,7 +633,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
                     if ( doProtected )
                     {
                         // all retries have failed, findProtectedNodeInForeground(..) included,
schedule a clean up
-                        findAndDeleteProtectedNodeInBackground(givenPath, protectedId, null);
+                        new FindAndDeleteProtectedNodeInBackground(client, ZKPaths.getPathAndNode(path).getPath(),
protectedId).execute();
                         // assign a new id if this builder is used again later
                         protectedId = UUID.randomUUID().toString();
                     }
@@ -793,62 +791,6 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
     }
 
     /**
-     * Attempt to delete a protected znode
-     *
-     * @param unAdjustedPath the path - raw without namespace resolution
-     * @param protectedId    the protected id
-     * @param callback       callback to use, <code>null</code> to create a new
one
-     */
-    private void findAndDeleteProtectedNodeInBackground(String unAdjustedPath, String protectedId,
FindProtectedNodeCB callback)
-    {
-        if ( client.getState() == CuratorFrameworkState.STARTED )
-        {
-            if ( callback == null )
-            {
-                callback = new FindProtectedNodeCB(unAdjustedPath, protectedId);
-            }
-            try
-            {
-                client.getChildren().inBackground(callback).forPath(ZKPaths.getPathAndNode(unAdjustedPath).getPath());
-            }
-            catch ( Exception e )
-            {
-                findAndDeleteProtectedNodeInBackground(unAdjustedPath, protectedId, callback);
-            }
-        }
-    }
-
-    private class FindProtectedNodeCB implements BackgroundCallback
-    {
-        final String path;
-        final String protectedId;
-
-        private FindProtectedNodeCB(String path, String protectedId)
-        {
-            this.path = path;
-            this.protectedId = protectedId;
-        }
-
-        @Override
-        public void processResult(CuratorFramework ignoreClient, CuratorEvent event) throws
Exception
-        {
-            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
-            {
-                final String node = findNode(event.getChildren(), ZKPaths.getPathAndNode(path).getPath(),
protectedId);
-                if ( node != null )
-                {
-                    client.delete().guaranteed().inBackground().forPath(node);
-                }
-            }
-            else if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue()
)
-            {
-                // retry
-                findAndDeleteProtectedNodeInBackground(path, protectedId, this);
-            }
-        }
-    }
-
-    /**
      * Attempt to find the znode that matches the given path and protected id
      *
      * @param children    a list of candidates znodes
@@ -856,7 +798,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
      * @param protectedId the protected id
      * @return the absolute path of the znode or <code>null</code> if it is not
found
      */
-    private static String findNode(final List<String> children, final String path,
final String protectedId)
+    static String findNode(final List<String> children, final String path, final String
protectedId)
     {
         final String protectedPrefix = getProtectedPrefix(protectedId);
         String foundNode = Iterables.find

http://git-wip-us.apache.org/repos/asf/curator/blob/9b68e19a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java
new file mode 100644
index 0000000..7b5073b
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+class FindAndDeleteProtectedNodeInBackground implements BackgroundOperation<Void>
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFrameworkImpl client;
+    private final String namespaceAdjustedParentPath;
+    private final String protectedId;
+
+    FindAndDeleteProtectedNodeInBackground(CuratorFrameworkImpl client, String namespaceAdjustedParentPath,
String protectedId)
+    {
+        this.client = client;
+        this.namespaceAdjustedParentPath = namespaceAdjustedParentPath;
+        this.protectedId = protectedId;
+    }
+
+    void execute()
+    {
+        OperationAndData.ErrorCallback<Void> errorCallback = new OperationAndData.ErrorCallback<Void>()
+        {
+            @Override
+            public void retriesExhausted(OperationAndData<Void> operationAndData)
+            {
+                client.processBackgroundOperation(operationAndData, null);
+            }
+        };
+        OperationAndData<Void> operationAndData = new OperationAndData<Void>(this,
null, null, errorCallback, null);
+        client.processBackgroundOperation(operationAndData, null);
+    }
+
+    @VisibleForTesting
+    static final AtomicBoolean debugInsertError = new AtomicBoolean(false);
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<Void> operationAndData)
throws Exception
+    {
+        final TimeTrace trace = client.getZookeeperClient().startTracer("FindAndDeleteProtectedNodeInBackground");
+        AsyncCallback.Children2Callback callback = new AsyncCallback.Children2Callback()
+        {
+            @Override
+            public void processResult(int rc, String path, Object o, List<String> strings,
Stat stat)
+            {
+                trace.commit();
+
+                if ( debugInsertError.compareAndSet(true, false) )
+                {
+                    rc = KeeperException.Code.CONNECTIONLOSS.intValue();
+                }
+
+                if ( rc == KeeperException.Code.OK.intValue() )
+                {
+                    final String node = CreateBuilderImpl.findNode(strings, "/", protectedId);
 // due to namespacing, don't let CreateBuilderImpl.findNode adjust the path
+                    if ( node != null )
+                    {
+                        try
+                        {
+                            String deletePath = client.unfixForNamespace(ZKPaths.makePath(namespaceAdjustedParentPath,
node));
+                            client.delete().guaranteed().inBackground().forPath(deletePath);
+                        }
+                        catch ( Exception e )
+                        {
+                            log.error("Could not start guaranteed delete for node: " + node);
+                            rc = KeeperException.Code.CONNECTIONLOSS.intValue();
+                        }
+                    }
+                }
+
+                if ( rc != KeeperException.Code.OK.intValue() )
+                {
+                    CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.CHILDREN,
rc, path, null, o, stat, null, strings, null, null);
+                    client.processBackgroundOperation(operationAndData, event);
+                }
+            }
+        };
+        client.getZooKeeper().getChildren(namespaceAdjustedParentPath, false, callback, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/9b68e19a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 95c3792..cefc1e7 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -72,6 +72,7 @@ public class TestFrameworkEdges extends BaseClassForTests
 
                 CreateBuilderImpl createBuilder = (CreateBuilderImpl)localClient.create();
                 createBuilder.failNextCreateForTesting = true;
+                FindAndDeleteProtectedNodeInBackground.debugInsertError.set(true);
                 try
                 {
                     createBuilder.withProtection().forPath("/parent/test");

http://git-wip-us.apache.org/repos/asf/curator/blob/9b68e19a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
index 3b63956..3100fde 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
@@ -583,6 +583,7 @@ public class DistributedQueue<T> implements QueueBase<T>
         final Semaphore processedLatch = new Semaphore(0);
         final boolean   isUsingLockSafety = (lockPath != null);
         int             min = minItemsBeforeRefresh;
+        int             submittedQty = 0;
         for ( final String itemNode : children )
         {
             if ( Thread.currentThread().isInterrupted() )
@@ -602,7 +603,6 @@ public class DistributedQueue<T> implements QueueBase<T>
             {
                 if ( refreshOnWatch && (currentVersion != childrenCache.getData().version)
)
                 {
-                    processedLatch.release(children.size());
                     break;
                 }
             }
@@ -642,9 +642,10 @@ public class DistributedQueue<T> implements QueueBase<T>
                     }
                 }
             );
+            ++submittedQty;
         }
 
-        processedLatch.acquire(children.size());
+        processedLatch.acquire(submittedQty);
     }
 
     private enum ProcessMessageBytesCode


Mime
View raw message