Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7E76118798 for ; Mon, 11 Jan 2016 00:30:27 +0000 (UTC) Received: (qmail 37776 invoked by uid 500); 11 Jan 2016 00:30:27 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 37743 invoked by uid 500); 11 Jan 2016 00:30:27 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 37694 invoked by uid 99); 11 Jan 2016 00:30:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Jan 2016 00:30:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C293EE0252; Mon, 11 Jan 2016 00:30:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Mon, 11 Jan 2016 00:30:26 -0000 Message-Id: <68a98832567c4085938cffe2a42ccc57@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] curator git commit: Moved findAndDeleteProtectedNodeInBackground code into separate operation that is processed through the standard Curator background code. This way, retries are applied (with sleep), etc. In the previous implementation, errors ca Repository: curator Updated Branches: refs/heads/master 31c04658c -> 45332f301 Moved findAndDeleteProtectedNodeInBackground code into separate operation that is processed through the standard Curator background code. This way, retries are applied (with sleep), etc. In the previous implementation, errors caused the background check to be run immediately and infinitely. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9b68e19a Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9b68e19a Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9b68e19a Branch: refs/heads/master Commit: 9b68e19a278e025fa5884445a2b2519463b57445 Parents: 31c0465 Author: randgalt Authored: Mon Dec 28 10:08:51 2015 -0500 Committer: randgalt Committed: Mon Dec 28 10:08:51 2015 -0500 ---------------------------------------------------------------------- .../framework/imps/CreateBuilderImpl.java | 68 +----------- .../FindAndDeleteProtectedNodeInBackground.java | 107 +++++++++++++++++++ .../framework/imps/TestFrameworkEdges.java | 1 + .../recipes/queue/DistributedQueue.java | 5 +- 4 files changed, 116 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/9b68e19a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index b72b7b6..a9cb600 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -24,7 +24,6 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.*; import org.apache.curator.framework.api.transaction.CuratorTransactionBridge; import org.apache.curator.framework.api.transaction.OperationType; @@ -464,13 +463,13 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperationnull to create a new one - */ - private void findAndDeleteProtectedNodeInBackground(String unAdjustedPath, String protectedId, FindProtectedNodeCB callback) - { - if ( client.getState() == CuratorFrameworkState.STARTED ) - { - if ( callback == null ) - { - callback = new FindProtectedNodeCB(unAdjustedPath, protectedId); - } - try - { - client.getChildren().inBackground(callback).forPath(ZKPaths.getPathAndNode(unAdjustedPath).getPath()); - } - catch ( Exception e ) - { - findAndDeleteProtectedNodeInBackground(unAdjustedPath, protectedId, callback); - } - } - } - - private class FindProtectedNodeCB implements BackgroundCallback - { - final String path; - final String protectedId; - - private FindProtectedNodeCB(String path, String protectedId) - { - this.path = path; - this.protectedId = protectedId; - } - - @Override - public void processResult(CuratorFramework ignoreClient, CuratorEvent event) throws Exception - { - if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) - { - final String node = findNode(event.getChildren(), ZKPaths.getPathAndNode(path).getPath(), protectedId); - if ( node != null ) - { - client.delete().guaranteed().inBackground().forPath(node); - } - } - else if ( event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue() ) - { - // retry - findAndDeleteProtectedNodeInBackground(path, protectedId, this); - } - } - } - - /** * Attempt to find the znode that matches the given path and protected id * * @param children a list of candidates znodes @@ -856,7 +798,7 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperationnull if it is not found */ - private static String findNode(final List children, final String path, final String protectedId) + static String findNode(final List children, final String path, final String protectedId) { final String protectedPrefix = getProtectedPrefix(protectedId); String foundNode = Iterables.find http://git-wip-us.apache.org/repos/asf/curator/blob/9b68e19a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java new file mode 100644 index 0000000..7b5073b --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.curator.TimeTrace; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +class FindAndDeleteProtectedNodeInBackground implements BackgroundOperation +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final CuratorFrameworkImpl client; + private final String namespaceAdjustedParentPath; + private final String protectedId; + + FindAndDeleteProtectedNodeInBackground(CuratorFrameworkImpl client, String namespaceAdjustedParentPath, String protectedId) + { + this.client = client; + this.namespaceAdjustedParentPath = namespaceAdjustedParentPath; + this.protectedId = protectedId; + } + + void execute() + { + OperationAndData.ErrorCallback errorCallback = new OperationAndData.ErrorCallback() + { + @Override + public void retriesExhausted(OperationAndData operationAndData) + { + client.processBackgroundOperation(operationAndData, null); + } + }; + OperationAndData operationAndData = new OperationAndData(this, null, null, errorCallback, null); + client.processBackgroundOperation(operationAndData, null); + } + + @VisibleForTesting + static final AtomicBoolean debugInsertError = new AtomicBoolean(false); + + @Override + public void performBackgroundOperation(final OperationAndData operationAndData) throws Exception + { + final TimeTrace trace = client.getZookeeperClient().startTracer("FindAndDeleteProtectedNodeInBackground"); + AsyncCallback.Children2Callback callback = new AsyncCallback.Children2Callback() + { + @Override + public void processResult(int rc, String path, Object o, List strings, Stat stat) + { + trace.commit(); + + if ( debugInsertError.compareAndSet(true, false) ) + { + rc = KeeperException.Code.CONNECTIONLOSS.intValue(); + } + + if ( rc == KeeperException.Code.OK.intValue() ) + { + final String node = CreateBuilderImpl.findNode(strings, "/", protectedId); // due to namespacing, don't let CreateBuilderImpl.findNode adjust the path + if ( node != null ) + { + try + { + String deletePath = client.unfixForNamespace(ZKPaths.makePath(namespaceAdjustedParentPath, node)); + client.delete().guaranteed().inBackground().forPath(deletePath); + } + catch ( Exception e ) + { + log.error("Could not start guaranteed delete for node: " + node); + rc = KeeperException.Code.CONNECTIONLOSS.intValue(); + } + } + } + + if ( rc != KeeperException.Code.OK.intValue() ) + { + CuratorEventImpl event = new CuratorEventImpl(client, CuratorEventType.CHILDREN, rc, path, null, o, stat, null, strings, null, null); + client.processBackgroundOperation(operationAndData, event); + } + } + }; + client.getZooKeeper().getChildren(namespaceAdjustedParentPath, false, callback, null); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/9b68e19a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java index 95c3792..cefc1e7 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java @@ -72,6 +72,7 @@ public class TestFrameworkEdges extends BaseClassForTests CreateBuilderImpl createBuilder = (CreateBuilderImpl)localClient.create(); createBuilder.failNextCreateForTesting = true; + FindAndDeleteProtectedNodeInBackground.debugInsertError.set(true); try { createBuilder.withProtection().forPath("/parent/test"); http://git-wip-us.apache.org/repos/asf/curator/blob/9b68e19a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java index 3b63956..3100fde 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java @@ -583,6 +583,7 @@ public class DistributedQueue implements QueueBase final Semaphore processedLatch = new Semaphore(0); final boolean isUsingLockSafety = (lockPath != null); int min = minItemsBeforeRefresh; + int submittedQty = 0; for ( final String itemNode : children ) { if ( Thread.currentThread().isInterrupted() ) @@ -602,7 +603,6 @@ public class DistributedQueue implements QueueBase { if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) ) { - processedLatch.release(children.size()); break; } } @@ -642,9 +642,10 @@ public class DistributedQueue implements QueueBase } } ); + ++submittedQty; } - processedLatch.acquire(children.size()); + processedLatch.acquire(submittedQty); } private enum ProcessMessageBytesCode