curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject git commit: Major bug! If the initial connection never succeeded, ConnectionState.CONNECTED would get sent when ConnectionState.LOST was intended
Date Thu, 26 Sep 2013 17:54:28 GMT
Updated Branches:
  refs/heads/CURATOR-59 [created] 01b6066fd


Major bug! If the initial connection never succeeded, ConnectionState.CONNECTED would get
sent when ConnectionState.LOST was intended


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/01b6066f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/01b6066f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/01b6066f

Branch: refs/heads/CURATOR-59
Commit: 01b6066fd344b490b338768d657da71239086499
Parents: 9460a46
Author: randgalt <randgalt@apache.org>
Authored: Thu Sep 26 10:54:16 2013 -0700
Committer: randgalt <randgalt@apache.org>
Committed: Thu Sep 26 10:54:16 2013 -0700

----------------------------------------------------------------------
 .../framework/state/ConnectionStateManager.java | 82 +++++++++++---------
 .../framework/imps/TestNeverConnected.java      | 63 +++++++++++++++
 2 files changed, 109 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/01b6066f/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 2ed60c1..fe5f18a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.state;
 
 import com.google.common.base.Function;
@@ -32,6 +33,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -39,11 +41,12 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class ConnectionStateManager implements Closeable
 {
-    private static final int   QUEUE_SIZE;
+    private static final int QUEUE_SIZE;
+
     static
     {
-        int         size = 25;
-        String      property = System.getProperty("ConnectionStateManagerSize", null);
+        int size = 25;
+        String property = System.getProperty("ConnectionStateManagerSize", null);
         if ( property != null )
         {
             try
@@ -58,13 +61,14 @@ public class ConnectionStateManager implements Closeable
         QUEUE_SIZE = size;
     }
 
-    private final Logger                                        log = LoggerFactory.getLogger(getClass());
-    private final BlockingQueue<ConnectionState>                eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
-    private final CuratorFramework                              client;
-    private final ListenerContainer<ConnectionStateListener>    listeners = new ListenerContainer<ConnectionStateListener>();
-    private final AtomicReference<ConnectionState>              currentState = new
AtomicReference<ConnectionState>();
-    private final ExecutorService                               service;
-    private final AtomicReference<State>                        state = new AtomicReference<State>(State.LATENT);
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
+    private final CuratorFramework client;
+    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
+    private final AtomicReference<ConnectionState> currentState = new AtomicReference<ConnectionState>();
+    private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
+    private final ExecutorService service;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
 
     private enum State
     {
@@ -74,7 +78,7 @@ public class ConnectionStateManager implements Closeable
     }
 
     /**
-     * @param client the client
+     * @param client        the client
      * @param threadFactory thread factory to use or null for a default
      */
     public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory)
@@ -90,22 +94,22 @@ public class ConnectionStateManager implements Closeable
     /**
      * Start the manager
      */
-    public void     start()
+    public void start()
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot
be started more than once");
 
         service.submit
-        (
-            new Callable<Object>()
-            {
-                @Override
-                public Object call() throws Exception
+            (
+                new Callable<Object>()
                 {
-                    processEvents();
-                    return null;
+                    @Override
+                    public Object call() throws Exception
+                    {
+                        processEvents();
+                        return null;
+                    }
                 }
-            }
-        );
+            );
     }
 
     @Override
@@ -132,22 +136,28 @@ public class ConnectionStateManager implements Closeable
      * Post a state change. If the manager is already in that state the change
      * is ignored. Otherwise the change is queued for listeners.
      *
-     * @param newState new state
+     * @param newConnectionState new state
      */
-    public void addStateChange(ConnectionState newState)
+    public void addStateChange(ConnectionState newConnectionState)
     {
         if ( state.get() != State.STARTED )
         {
             return;
         }
 
-        ConnectionState     previousState = currentState.getAndSet(newState);
-        if ( previousState == newState )
+        ConnectionState previousState = currentState.getAndSet(newConnectionState);
+        if ( previousState == newConnectionState )
         {
             return;
         }
 
-        ConnectionState     localState = (previousState == null) ? ConnectionState.CONNECTED
: newState;
+        ConnectionState localState = newConnectionState;
+        boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState
== ConnectionState.SUSPENDED));
+        if ( !isNegativeMessage && initialConnectMessageSent.compareAndSet(false,
true) )
+        {
+            localState = ConnectionState.CONNECTED;
+        }
+
         log.info("State change: " + localState);
         while ( !eventQueue.offer(localState) )
         {
@@ -162,7 +172,7 @@ public class ConnectionStateManager implements Closeable
         {
             while ( !Thread.currentThread().isInterrupted() )
             {
-                final ConnectionState    newState = eventQueue.take();
+                final ConnectionState newState = eventQueue.take();
 
                 if ( listeners.size() == 0 )
                 {
@@ -170,17 +180,17 @@ public class ConnectionStateManager implements Closeable
                 }
 
                 listeners.forEach
-                (
-                    new Function<ConnectionStateListener, Void>()
-                    {
-                        @Override
-                        public Void apply(ConnectionStateListener listener)
+                    (
+                        new Function<ConnectionStateListener, Void>()
                         {
-                            listener.stateChanged(client, newState);
-                            return null;
+                            @Override
+                            public Void apply(ConnectionStateListener listener)
+                            {
+                                listener.stateChanged(client, newState);
+                                return null;
+                            }
                         }
-                    }
-                );
+                    );
             }
         }
         catch ( InterruptedException e )

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/01b6066f/curator-framework/src/test/java/org/apache/curator/framework/imps/TestNeverConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestNeverConnected.java
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestNeverConnected.java
new file mode 100644
index 0000000..49ac850
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestNeverConnected.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import com.google.common.collect.Queues;
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+
+public class TestNeverConnected
+{
+    @Test
+    public void testNeverConnected() throws Exception
+    {
+        // use a connection string to a non-existent server
+        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:1111", 100,
100, new RetryOneTime(1));
+        try
+        {
+            final BlockingQueue<ConnectionState> queue = Queues.newLinkedBlockingQueue();
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState state)
+                {
+                    queue.add(state);
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+            client.start();
+
+            client.create().inBackground().forPath("/");
+
+            Assert.assertEquals(queue.take(), ConnectionState.LOST);
+        }
+        finally
+        {
+            Closeables.closeQuietly(client);
+        }
+    }
+}


Mime
View raw message