curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [9/9] curator git commit: Merge branch 'master' into CURATOR-3.0
Date Mon, 18 Jan 2016 00:30:00 GMT
Merge branch 'master' into CURATOR-3.0

Conflicts:
	curator-client/src/main/java/org/apache/curator/RetryLoop.java
	curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
	curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
	curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
	curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
	curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
	curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9a03ea93
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9a03ea93
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9a03ea93

Branch: refs/heads/CURATOR-3.0
Commit: 9a03ea93937af047e8ad13c2e3e3559520abfb0a
Parents: 24aa3c3 36a72d9
Author: randgalt <randgalt@apache.org>
Authored: Sun Jan 17 17:40:52 2016 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Sun Jan 17 17:40:52 2016 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |  4 ++
 .../apache/curator/CuratorZookeeperClient.java  |  2 +
 .../apache/curator/SessionFailRetryLoop.java    |  2 +
 .../ClassicConnectionHandlingPolicy.java        |  2 +
 .../StandardConnectionHandlingPolicy.java       |  2 +
 .../exhibitor/ExhibitorEnsembleProvider.java    |  2 +
 .../org/apache/curator/utils/ThreadUtils.java   | 23 ++++++++++
 .../src/main/java/locking/LockingExample.java   |  7 ++-
 .../curator/framework/imps/Backgrounding.java   |  2 +
 .../framework/imps/CreateBuilderImpl.java       |  3 ++
 .../framework/imps/CuratorFrameworkImpl.java    | 46 +++++++++++++-------
 .../framework/imps/DeleteBuilderImpl.java       |  2 +
 .../framework/imps/FailedOperationManager.java  |  2 +
 .../FindAndDeleteProtectedNodeInBackground.java |  3 ++
 .../framework/imps/GetDataBuilderImpl.java      |  2 +
 .../curator/framework/imps/NamespaceImpl.java   |  2 +
 .../framework/imps/NamespaceWatcher.java        |  2 +
 .../framework/imps/OperationAndData.java        | 13 ++++--
 .../framework/listen/ListenerContainer.java     |  2 +
 .../framework/state/ConnectionStateManager.java | 14 +++---
 .../recipes/AfterConnectionEstablished.java     |  1 +
 .../framework/recipes/cache/NodeCache.java      |  4 ++
 .../recipes/cache/PathChildrenCache.java        |  4 ++
 .../framework/recipes/cache/TreeCache.java      |  7 +++
 .../framework/recipes/leader/LeaderLatch.java   |  5 +++
 .../recipes/leader/LeaderSelector.java          | 10 ++---
 .../framework/recipes/locks/ChildReaper.java    |  1 +
 .../recipes/locks/InterProcessMultiLock.java    |  4 ++
 .../recipes/locks/InterProcessSemaphore.java    |  4 ++
 .../recipes/locks/InterProcessSemaphoreV2.java  |  2 +
 .../framework/recipes/locks/LockInternals.java  |  2 +
 .../curator/framework/recipes/locks/Reaper.java |  1 +
 .../framework/recipes/nodes/GroupMember.java    |  3 ++
 .../recipes/nodes/PersistentEphemeralNode.java  |  3 ++
 .../recipes/queue/DistributedQueue.java         | 43 ++++++++++--------
 .../framework/recipes/queue/QueueSharder.java   | 16 ++++---
 .../framework/recipes/shared/SharedValue.java   |  2 +
 ...estResetConnectionWithBackgroundFailure.java | 37 +++++++---------
 .../curator/test/TestingZooKeeperMain.java      | 24 ++++++++--
 .../entity/JsonServiceInstanceMarshaller.java   |  3 ++
 .../entity/JsonServiceInstancesMarshaller.java  |  2 +
 .../server/rest/DiscoveryResource.java          |  6 +++
 .../discovery/server/rest/InstanceCleanup.java  |  2 +
 .../discovery/details/ServiceDiscoveryImpl.java |  3 ++
 .../x/rpc/idl/discovery/DiscoveryService.java   |  8 ++++
 .../idl/discovery/DiscoveryServiceLowLevel.java |  7 +++
 .../idl/services/CuratorProjectionService.java  | 25 +++++++++++
 47 files changed, 286 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --cc curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 0b21643,dc6ac53..d79ec58
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@@ -18,11 -18,11 +18,12 @@@
   */
  package org.apache.curator;
  
 -import org.apache.curator.utils.CloseableUtils;
 +import org.apache.curator.connection.ConnectionHandlingPolicy;
  import org.apache.curator.drivers.TracerDriver;
  import org.apache.curator.ensemble.EnsembleProvider;
 +import org.apache.curator.utils.CloseableUtils;
  import org.apache.curator.utils.DebugUtils;
+ import org.apache.curator.utils.ThreadUtils;
  import org.apache.curator.utils.ZookeeperFactory;
  import org.apache.zookeeper.KeeperException;
  import org.apache.zookeeper.WatchedEvent;

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/SessionFailRetryLoop.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --cc curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
index f620ffb,0000000..fe24b42
mode 100644,000000..100644
--- a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
@@@ -1,86 -1,0 +1,88 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.curator.connection;
 +
 +import org.apache.curator.CuratorZookeeperClient;
 +import org.apache.curator.RetryLoop;
++import org.apache.curator.utils.ThreadUtils;
 +import java.util.concurrent.Callable;
 +
 +/**
 + * Emulates the pre 3.0.0 Curator connection handling
 + */
 +public class ClassicConnectionHandlingPolicy implements ConnectionHandlingPolicy
 +{
 +    @Override
 +    public int getSimulatedSessionExpirationPercent()
 +    {
 +        return 0;
 +    }
 +
 +    @Override
 +    public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc)
throws Exception
 +    {
 +        T result = null;
 +        RetryLoop retryLoop = client.newRetryLoop();
 +        while ( retryLoop.shouldContinue() )
 +        {
 +            try
 +            {
 +                client.internalBlockUntilConnectedOrTimedOut();
 +                result = proc.call();
 +                retryLoop.markComplete();
 +            }
 +            catch ( Exception e )
 +            {
++                ThreadUtils.checkInterrupted(e);
 +                retryLoop.takeException(e);
 +            }
 +        }
 +
 +        return result;
 +    }
 +
 +    @Override
 +    public CheckTimeoutsResult checkTimeouts(Callable<String> hasNewConnectionString,
long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
 +    {
 +        CheckTimeoutsResult result = CheckTimeoutsResult.NOP;
 +        int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
 +        long elapsed = System.currentTimeMillis() - connectionStartMs;
 +        if ( elapsed >= minTimeout )
 +        {
 +            if ( hasNewConnectionString.call() != null )
 +            {
 +                result = CheckTimeoutsResult.NEW_CONNECTION_STRING;
 +            }
 +            else
 +            {
 +                int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs);
 +                if ( elapsed > maxTimeout )
 +                {
 +                    result = CheckTimeoutsResult.RESET_CONNECTION;
 +                }
 +                else
 +                {
 +                    result = CheckTimeoutsResult.CONNECTION_TIMEOUT;
 +                }
 +            }
 +        }
 +
 +        return result;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --cc curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
index 6995815,0000000..8f7a438
mode 100644,000000..100644
--- a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
+++ b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
@@@ -1,87 -1,0 +1,89 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.curator.connection;
 +
 +import com.google.common.base.Preconditions;
 +import org.apache.curator.CuratorZookeeperClient;
 +import org.apache.curator.RetryLoop;
++import org.apache.curator.utils.ThreadUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import java.util.concurrent.Callable;
 +
 +/**
 + * Curator's standard connection handling since 3.0.0
 + *
 + * @since 3.0.0
 + */
 +public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy
 +{
 +    private final Logger log = LoggerFactory.getLogger(getClass());
 +    private final int expirationPercent;
 +
 +    public StandardConnectionHandlingPolicy()
 +    {
 +        this(100);
 +    }
 +
 +    public StandardConnectionHandlingPolicy(int expirationPercent)
 +    {
 +        Preconditions.checkArgument((expirationPercent > 0) && (expirationPercent
<= 100), "expirationPercent must be > 0 and <= 100");
 +        this.expirationPercent = expirationPercent;
 +    }
 +
 +    @Override
 +    public int getSimulatedSessionExpirationPercent()
 +    {
 +        return expirationPercent;
 +    }
 +
 +    @Override
 +    public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc)
throws Exception
 +    {
 +        client.internalBlockUntilConnectedOrTimedOut();
 +
 +        T result = null;
 +        RetryLoop retryLoop = client.newRetryLoop();
 +        while ( retryLoop.shouldContinue() )
 +        {
 +            try
 +            {
 +                result = proc.call();
 +                retryLoop.markComplete();
 +            }
 +            catch ( Exception e )
 +            {
++                ThreadUtils.checkInterrupted(e);
 +                retryLoop.takeException(e);
 +            }
 +        }
 +
 +        return result;
 +    }
 +
 +    @Override
 +    public CheckTimeoutsResult checkTimeouts(Callable<String> hasNewConnectionString,
long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
 +    {
 +        if ( hasNewConnectionString.call() != null )
 +        {
 +            return CheckTimeoutsResult.NEW_CONNECTION_STRING;
 +        }
 +        return CheckTimeoutsResult.NOP;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index 5622508,eeb057d..ada4bae
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@@ -25,8 -25,10 +25,9 @@@ import com.google.common.collect.Iterab
  import org.apache.curator.RetryLoop;
  import org.apache.curator.TimeTrace;
  import org.apache.curator.framework.api.*;
 -import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
  import org.apache.curator.framework.api.transaction.OperationType;
  import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
+ import org.apache.curator.utils.ThreadUtils;
  import org.apache.curator.utils.ZKPaths;
  import org.apache.zookeeper.AsyncCallback;
  import org.apache.zookeeper.CreateMode;

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 10bed18,c3247a1..ab72308
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@@ -27,10 -27,11 +27,11 @@@ import org.apache.curator.framework.api
  import org.apache.curator.framework.api.CuratorEvent;
  import org.apache.curator.framework.api.CuratorEventType;
  import org.apache.curator.framework.api.DeleteBuilder;
 +import org.apache.curator.framework.api.DeleteBuilderMain;
  import org.apache.curator.framework.api.Pathable;
 -import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
  import org.apache.curator.framework.api.transaction.OperationType;
  import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
+ import org.apache.curator.utils.ThreadUtils;
  import org.apache.curator.utils.ZKPaths;
  import org.apache.zookeeper.AsyncCallback;
  import org.apache.zookeeper.KeeperException;

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
index 405561b,0000000..c09e2ec
mode 100644,000000..100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
@@@ -1,68 -1,0 +1,70 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.curator.framework.imps;
 +
 +import org.apache.curator.framework.CuratorFramework;
++import org.apache.curator.utils.ThreadUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +
 +abstract class FailedOperationManager<T>
 +{
 +    private final Logger log = LoggerFactory.getLogger(getClass());
 +    protected final CuratorFramework client;
 +    
 +    @VisibleForTesting
 +    volatile FailedOperationManagerListener<T> debugListener = null;
 +    
 +    interface FailedOperationManagerListener<T>
 +    {
 +       public void pathAddedForGuaranteedOperation(T detail);
 +    }
 +
 +    FailedOperationManager(CuratorFramework client)
 +    {
 +        this.client = client;
 +    }
 +
 +    void addFailedOperation(T details)
 +    {
 +        if ( debugListener != null )
 +        {
 +            debugListener.pathAddedForGuaranteedOperation(details);
 +        }
 +        
 +        
 +        if ( client.getState() == CuratorFrameworkState.STARTED )
 +        {
 +            log.debug("Details being added to guaranteed operation set: " + details);
 +            try
 +            {
 +                executeGuaranteedOperationInBackground(details);
 +            }
 +            catch ( Exception e )
 +            {
++                ThreadUtils.checkInterrupted(e);
 +                addFailedOperation(details);
 +            }
 +        }
 +    }
 +    
 +    protected abstract void executeGuaranteedOperationInBackground(T details) throws Exception;
 +}

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index b46cddb,279eece..5f7b985
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@@ -38,9 -38,8 +38,9 @@@ class OperationAndData<T> implements De
      private final ErrorCallback<T> errorCallback;
      private final AtomicInteger retryCount = new AtomicInteger(0);
      private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
-     private final long ordinal = nextOrdinal.getAndIncrement();
+     private final AtomicLong ordinal = new AtomicLong();
      private final Object context;
 +    private final boolean connectionRequired;
  
      interface ErrorCallback<T>
      {
@@@ -54,14 -53,15 +54,21 @@@
          this.callback = callback;
          this.errorCallback = errorCallback;
          this.context = context;
 +        this.connectionRequired = connectionRequired;
-     }      
+         reset();
+     }
+ 
+     void reset()
+     {
+         retryCount.set(0);
+         ordinal.set(nextOrdinal.getAndIncrement());
+     }
  
 +    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback
callback, ErrorCallback<T> errorCallback, Object context)
 +    {
 +        this(operation, data, callback, errorCallback, context, true);
 +    }
 +
      Object getContext()
      {
          return context;

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index b6f2e02,8cc37aa..cbb8d16
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@@ -246,78 -238,36 +246,80 @@@ public class ConnectionStateManager imp
  
      private void processEvents()
      {
-         try
+         while ( state.get() == State.STARTED )
          {
-             while ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED)
)
+             try
              {
 -                final ConnectionState newState = eventQueue.take();
 -
 -                if ( listeners.size() == 0 )
 +                int lastNegotiatedSessionTimeoutMs = client.getZookeeperClient().getLastNegotiatedSessionTimeoutMs();
 +                int useSessionTimeoutMs = (lastNegotiatedSessionTimeoutMs > 0) ? lastNegotiatedSessionTimeoutMs
: sessionTimeoutMs;
 +                int pollMaxMs = (useSessionTimeoutMs * 2) / 3; // 2/3 of session timeout
 +                final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
 +                if ( newState != null )
                  {
 -                    log.warn("There are no ConnectionStateListeners registered.");
 -                }
 +                    if ( listeners.size() == 0 )
 +                    {
 +                        log.warn("There are no ConnectionStateListeners registered.");
 +                    }
  
 -                listeners.forEach
 -                    (
 -                        new Function<ConnectionStateListener, Void>()
 -                        {
 -                            @Override
 -                            public Void apply(ConnectionStateListener listener)
 +                    listeners.forEach
 +                        (
 +                            new Function<ConnectionStateListener, Void>()
                              {
 -                                listener.stateChanged(client, newState);
 -                                return null;
 +                                @Override
 +                                public Void apply(ConnectionStateListener listener)
 +                                {
 +                                    listener.stateChanged(client, newState);
 +                                    return null;
 +                                }
                              }
 -                        }
 -                    );
 +                        );
 +                }
 +                else if ( sessionExpirationPercent > 0 )
 +                {
 +                    synchronized(this)
 +                    {
 +                        checkSessionExpiration();
 +                    }
 +                }
              }
-         }
-         catch ( InterruptedException e )
-         {
-             Thread.currentThread().interrupt();
+             catch ( InterruptedException e )
+             {
+                 // swallow the interrupt as it's only possible from either a background
+                 // operation and, thus, doesn't apply to this loop or the instance
+                 // is being closed in which case the while test will get it
+             }
          }
      }
 +
 +    private void checkSessionExpiration()
 +    {
 +        if ( (currentConnectionState == ConnectionState.SUSPENDED) && (startOfSuspendedEpoch
!= 0) )
 +        {
 +            long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
 +            int lastNegotiatedSessionTimeoutMs = client.getZookeeperClient().getLastNegotiatedSessionTimeoutMs();
 +            int useSessionTimeoutMs = (lastNegotiatedSessionTimeoutMs > 0) ? lastNegotiatedSessionTimeoutMs
: sessionTimeoutMs;
 +            useSessionTimeoutMs = (useSessionTimeoutMs * sessionExpirationPercent) / 100;
 +            if ( elapsedMs >= useSessionTimeoutMs )
 +            {
 +                log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting
a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d", elapsedMs, useSessionTimeoutMs));
 +                try
 +                {
 +                    // LOL - this method was proposed by me (JZ) in 2013 for totally unrelated
reasons
 +                    // it got added to ZK 3.5 and now does exactly what we need
 +                    // https://issues.apache.org/jira/browse/ZOOKEEPER-1730
 +                    client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
 +                }
 +                catch ( Exception e )
 +                {
 +                    log.error("Could not inject session expiration", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    private void setCurrentConnectionState(ConnectionState newConnectionState)
 +    {
 +        currentConnectionState = newConnectionState;
 +        startOfSuspendedEpoch = (currentConnectionState == ConnectionState.SUSPENDED) ?
System.currentTimeMillis() : 0;
 +    }
  }

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 3bf2ec3,8524075..36dbff4
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@@ -30,8 -31,7 +30,9 @@@ import org.apache.curator.framework.imp
  import org.apache.curator.framework.recipes.shared.SharedCountListener;
  import org.apache.curator.framework.recipes.shared.SharedCountReader;
  import org.apache.curator.framework.state.ConnectionState;
 +import org.apache.curator.utils.CloseableUtils;
 +import org.apache.curator.utils.PathUtils;
+ import org.apache.curator.utils.ThreadUtils;
  import org.apache.curator.utils.ZKPaths;
  import org.apache.zookeeper.CreateMode;
  import org.apache.zookeeper.KeeperException;

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
index 4b0da11,f712945..dc2f681
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
@@@ -28,7 -27,9 +28,8 @@@ import org.apache.curator.framework.Wat
  import org.apache.curator.framework.api.CuratorWatcher;
  import org.apache.curator.framework.imps.CuratorFrameworkState;
  import org.apache.curator.utils.PathUtils;
+ import org.apache.curator.utils.ThreadUtils;
  import org.apache.curator.utils.ZKPaths;
 -import org.apache.zookeeper.CreateMode;
  import org.apache.zookeeper.KeeperException;
  import org.apache.zookeeper.WatchedEvent;
  import org.apache.zookeeper.Watcher;

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 881cde7,cc1159a..c5e9cc2
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@@ -329,10 -326,9 +330,11 @@@ public class PersistentEphemeralNode im
          }
          catch ( Exception e )
          {
+             ThreadUtils.checkInterrupted(e);
              throw new IOException(e);
          }
 +
 +        client.removeWatchers();
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/curator/blob/9a03ea93/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
----------------------------------------------------------------------
diff --cc curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
index b2389e9,7487557..19535a6
--- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
@@@ -27,35 -23,41 +27,53 @@@ import org.apache.zookeeper.server.Serv
  import org.apache.zookeeper.server.ServerConfig;
  import org.apache.zookeeper.server.ZKDatabase;
  import org.apache.zookeeper.server.ZooKeeperServer;
 -import org.apache.zookeeper.server.ZooKeeperServerMain;
 +import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
  import org.apache.zookeeper.server.quorum.QuorumPeer;
  import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import javax.management.JMException;
  import java.io.IOException;
  import java.lang.reflect.Field;
- import java.lang.reflect.Modifier;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
  import java.nio.channels.ServerSocketChannel;
  import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicReference;
  
 -public class TestingZooKeeperMain extends ZooKeeperServerMain implements ZooKeeperMainFace
 +public class TestingZooKeeperMain implements ZooKeeperMainFace
  {
 +    private static final Logger log = LoggerFactory.getLogger(TestingZooKeeperMain.class);
 +
      private final CountDownLatch latch = new CountDownLatch(1);
      private final AtomicReference<Exception> startingException = new AtomicReference<Exception>(null);
  
 -    private static final int MAX_WAIT_MS;
 +    private volatile ServerCnxnFactory cnxnFactory;
 +    private volatile TestZooKeeperServer zkServer;
 +    private volatile ContainerManager containerManager;
 +
 +    private static final Timing timing = new Timing();
  
-     static final int MAX_WAIT_MS = timing.milliseconds();
++    static final int MAX_WAIT_MS;
+     static
+     {
+         long startMs = System.currentTimeMillis();
+         try
+         {
+             // this can take forever and fails tests - ZK calls it internally so there's
nothing we can do
+             // pre flight it and use it to calculate max wait
+             //noinspection ResultOfMethodCallIgnored
+             InetAddress.getLocalHost().getCanonicalHostName();
+         }
+         catch ( UnknownHostException e )
+         {
+             // ignore
+         }
+         long elapsed = System.currentTimeMillis() - startMs;
+         MAX_WAIT_MS = Math.max((int)elapsed * 2, 1000);
+     }
  
      @Override
      public void kill()
@@@ -195,95 -169,39 +213,95 @@@
          {
              e.printStackTrace();    // just ignore - this class is only for testing
          }
 +        finally
 +        {
 +            zkServer = null;
 +        }
      }
  
 -    private ServerCnxnFactory getServerConnectionFactory() throws Exception
 +    // copied from ZooKeeperServerMain.java
 +    private void internalRunFromConfig(ServerConfig config) throws IOException
      {
 -        Field cnxnFactoryField = ZooKeeperServerMain.class.getDeclaredField("cnxnFactory");
 -        cnxnFactoryField.setAccessible(true);
 -        ServerCnxnFactory cnxnFactory;
 +        log.info("Starting server");
 +        FileTxnSnapLog txnLog = null;
 +        try {
 +            // Note that this thread isn't going to be doing anything else,
 +            // so rather than spawning another thread, we will just call
 +            // run() in this thread.
 +            // create a file logger url from the command line args
 +            txnLog = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
 +            zkServer = new TestZooKeeperServer(txnLog, config);
  
 -        // Wait until the cnxnFactory field is non-null or up to 1s, whichever comes first.
 -        long startTime = System.currentTimeMillis();
 -        do
 -        {
 -            cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(this);
 +            try
 +            {
 +                cnxnFactory = ServerCnxnFactory.createFactory();
 +                cnxnFactory.configure(config.getClientPortAddress(),
 +                    config.getMaxClientCnxns());
 +            }
 +            catch ( IOException e )
 +            {
 +                log.info("Could not server. Waiting and trying one more time.", e);
 +                timing.sleepABit();
 +                cnxnFactory = ServerCnxnFactory.createFactory();
 +                cnxnFactory.configure(config.getClientPortAddress(),
 +                    config.getMaxClientCnxns());
 +            }
 +            cnxnFactory.startup(zkServer);
-             containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.getFirstProcessor(),
Integer.getInteger("znode.container.checkIntervalMs", (int)TimeUnit.MINUTES.toMillis(1L)).intValue(),
Integer.getInteger("znode.container.maxPerMinute", 10000).intValue());
++            containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.getFirstProcessor(),
Integer.getInteger("znode.container.checkIntervalMs", (int)TimeUnit.MINUTES.toMillis(1L)),
Integer.getInteger("znode.container.maxPerMinute", 10000));
 +            containerManager.start();
 +            latch.countDown();
 +            cnxnFactory.join();
 +            if ( (zkServer != null) && zkServer.isRunning()) {
 +                zkServer.shutdown();
 +            }
 +        } catch (InterruptedException e) {
 +            // warn, but generally this is ok
 +            Thread.currentThread().interrupt();
 +            log.warn("Server interrupted", e);
 +        } finally {
 +            if (txnLog != null) {
 +                txnLog.close();
 +            }
          }
 -        while ( (cnxnFactory == null) && ((System.currentTimeMillis() - startTime)
< MAX_WAIT_MS) );
 -
 -        return cnxnFactory;
      }
  
 -    private ZooKeeperServer getZooKeeperServer(ServerCnxnFactory cnxnFactory) throws Exception
 +    public static class TestZooKeeperServer extends ZooKeeperServer
      {
 -        Field zkServerField = ServerCnxnFactory.class.getDeclaredField("zkServer");
 -        zkServerField.setAccessible(true);
 -        ZooKeeperServer zkServer;
 +        public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config)
 +        {
 +            super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(),
null);
 +        }
  
 -        // Wait until the zkServer field is non-null or up to 1s, whichever comes first.
 -        long startTime = System.currentTimeMillis();
 -        do
 +        private final AtomicBoolean isRunning = new AtomicBoolean(false);
 +
 +        public RequestProcessor getFirstProcessor()
 +        {
 +            return firstProcessor;
 +        }
 +
 +        protected void registerJMX()
 +        {
 +            // NOP
 +        }
 +
 +        @Override
 +        protected void unregisterJMX()
          {
 -            zkServer = (ZooKeeperServer)zkServerField.get(cnxnFactory);
 +            // NOP
          }
 -        while ( (zkServer == null) && ((System.currentTimeMillis() - startTime)
< MAX_WAIT_MS) );
  
 -        return zkServer;
 +        @Override
 +        public boolean isRunning()
 +        {
 +            return isRunning.get() || super.isRunning();
 +        }
 +
 +        public void noteStartup()
 +        {
 +            synchronized (this) {
 +                isRunning.set(true);
 +                this.notifyAll();
 +            }
 +        }
      }
  }


Mime
View raw message