curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [47/50] [abbrv] git commit: wip
Date Mon, 10 Jun 2013 15:01:32 GMT
wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/cd62a3dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/cd62a3dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/cd62a3dd

Branch: refs/heads/initialize
Commit: cd62a3dd50d419da42b37c389aac87eb96898456
Parents: ffc8d05
Author: Jordan Zimmerman <jordan@jordanzimmerman.com>
Authored: Mon Jan 14 15:13:49 2013 -0800
Committer: Jordan Zimmerman <jordan@jordanzimmerman.com>
Committed: Mon Jan 14 15:13:49 2013 -0800

----------------------------------------------------------------------
 .../recipes/cache/ForceRefreshOperation.java    |  57 ---------
 .../recipes/cache/PathChildrenCache.java        | 127 +++++++++++++++----
 .../recipes/cache/PathChildrenCacheEvent.java   |  14 +-
 .../recipes/cache/RefreshOperation.java         |   8 +-
 .../recipes/cache/TestPathChildrenCache.java    |  67 +++++++++-
 5 files changed, 179 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/cd62a3dd/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/ForceRefreshOperation.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/ForceRefreshOperation.java
b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/ForceRefreshOperation.java
deleted file mode 100644
index e56dade..0000000
--- a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/ForceRefreshOperation.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2012 Netflix, Inc.
- *
- *    Licensed under the Apache License, Version 2.0 (the "License");
- *    you may not use this file except in compliance with the License.
- *    You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- */
-
-package com.netflix.curator.framework.recipes.cache;
-
-class ForceRefreshOperation implements Operation
-{
-    private final PathChildrenCache cache;
-
-    ForceRefreshOperation(PathChildrenCache cache)
-    {
-        this.cache = cache;
-    }
-
-    @Override
-    public void invoke() throws Exception
-    {
-        cache.refresh(true);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return ForceRefreshOperation.class.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj)
-    {
-        //noinspection SimplifiableIfStatement
-        if ( obj == null )
-        {
-            return false;
-        }
-
-        return (this == obj) || (getClass().equals(obj.getClass()));
-    }
-
-    @Override
-    public String toString()
-    {
-        return "ForceRefreshOperation{}";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/cd62a3dd/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java
b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java
index 357bcca..0710ab6 100644
--- a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java
@@ -72,12 +72,14 @@ public class PathChildrenCache implements Closeable
     private final ListenerContainer<PathChildrenCacheListener>  listeners = new ListenerContainer<PathChildrenCacheListener>();
     private final ConcurrentMap<String, ChildData>              currentData = Maps.newConcurrentMap();
 
+    private volatile Set<String>                  initialSet = null;
+
     private final Watcher     childrenWatcher = new Watcher()
     {
         @Override
         public void process(WatchedEvent event)
         {
-            offerOperation(new RefreshOperation(PathChildrenCache.this));
+            offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
         }
     };
 
@@ -189,7 +191,7 @@ public class PathChildrenCache implements Closeable
      */
     public void     start() throws Exception
     {
-        start(false);
+        start(StartMode.STANDARD);
     }
 
     /**
@@ -198,32 +200,73 @@ public class PathChildrenCache implements Closeable
      * @param buildInitial if true, {@link #rebuild()} will be called before this method
      *                     returns in order to get an initial view of the node
      * @throws Exception errors
+     * @deprecated use {@link #start(StartMode)}
      */
     public void     start(boolean buildInitial) throws Exception
     {
+        start(buildInitial ? StartMode.BUILD_INITIAL : StartMode.STANDARD);
+    }
+
+    /**
+     * How to load initial data when calling start()
+     */
+    public enum StartMode
+    {
+        /**
+         * Load data in the background - start() will return immediately
+         */
+        STANDARD,
+
+        /**
+         * Load initial data set in the foregroup. start() will block until all initial data
is loaded
+         */
+        BUILD_INITIAL,
+
+        /**
+         * Load data in the background and post {@link PathChildrenCacheEvent.Type#INITIALIZED}
when done
+         */
+        POST_INITIALIZED_EVENT
+    }
+
+    public void     start(StartMode mode) throws Exception
+    {
+        mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+
         Preconditions.checkState(!executorService.isShutdown(), "already started");
 
         client.getConnectionStateListenable().addListener(connectionStateListener);
         executorService.submit
-            (
-                new Callable<Void>()
+        (
+            new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
                 {
-                    @Override
-                    public Void call() throws Exception
-                    {
-                        mainLoop();
-                        return null;
-                    }
+                    mainLoop();
+                    return null;
                 }
-            );
+            }
+        );
 
-        if ( buildInitial )
+        switch ( mode )
         {
-            rebuild();
-        }
-        else
-        {
-            offerOperation(new RefreshOperation(this));
+            case STANDARD:
+            {
+                offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
+                break;
+            }
+
+            case BUILD_INITIAL:
+            {
+                rebuild();
+                break;
+            }
+
+            case POST_INITIALIZED_EVENT:
+            {
+                offerOperation(new RefreshOperation(this, RefreshMode.POST_CHILDREN_INITIALIZED_EVENT));
+                break;
+            }
         }
     }
 
@@ -252,7 +295,7 @@ public class PathChildrenCache implements Closeable
         }
 
         // this is necessary so that any updates that occurred while rebuilding are taken
-        offerOperation(new ForceRefreshOperation(this));
+        offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
     }
 
     /**
@@ -272,7 +315,7 @@ public class PathChildrenCache implements Closeable
 
         // this is necessary so that any updates that occurred while rebuilding are taken
         // have to rebuild entire tree in case this node got deleted in the interim
-        offerOperation(new ForceRefreshOperation(this));
+        offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
     }
 
     /**
@@ -364,7 +407,7 @@ public class PathChildrenCache implements Closeable
     public void clearAndRefresh() throws Exception
     {
         currentData.clear();
-        offerOperation(new RefreshOperation(this));
+        offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
     }
 
     /**
@@ -376,7 +419,14 @@ public class PathChildrenCache implements Closeable
         currentData.clear();
     }
 
-    void refresh(final boolean forceGetDataAndStat) throws Exception
+    enum RefreshMode
+    {
+        STANDARD,
+        FORCE_GET_DATA_AND_STAT,
+        POST_CHILDREN_INITIALIZED_EVENT
+    }
+
+    void refresh(final RefreshMode mode) throws Exception
     {
         ensurePath.ensure(client.getZookeeperClient());
 
@@ -385,7 +435,7 @@ public class PathChildrenCache implements Closeable
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
             {
-                processChildren(event.getChildren(), forceGetDataAndStat);
+                processChildren(event.getChildren(), mode);
             }
         };
 
@@ -507,7 +557,7 @@ public class PathChildrenCache implements Closeable
         {
             try
             {
-                offerOperation(new ForceRefreshOperation(this));
+                offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
                 offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED,
null)));
             }
             catch ( Exception e )
@@ -519,7 +569,7 @@ public class PathChildrenCache implements Closeable
         }
     }
 
-    private void processChildren(List<String> children, boolean forceGetDataAndStat)
throws Exception
+    private void processChildren(List<String> children, RefreshMode mode) throws Exception
     {
         List<String>    fullPaths = Lists.transform
         (
@@ -541,13 +591,39 @@ public class PathChildrenCache implements Closeable
             remove(fullPath);
         }
 
+        if ( mode == RefreshMode.POST_CHILDREN_INITIALIZED_EVENT )
+        {
+            initialSet = Sets.newHashSet(children);
+            updateInitialSet(null);
+        }
+
         for ( String name : children )
         {
             String      fullPath = ZKPaths.makePath(path, name);
-            if ( forceGetDataAndStat || !currentData.containsKey(fullPath) )
+            if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath)
)
             {
                 getDataAndStat(fullPath);
             }
+            else
+            {
+                updateInitialSet(name);
+            }
+        }
+    }
+
+    private void updateInitialSet(String child)
+    {
+        if ( initialSet != null )
+        {
+            if ( child != null )
+            {
+                initialSet.remove(child);
+            }
+            if ( initialSet.size() == 0 )
+            {
+                offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED,
null)));
+                initialSet = null;
+            }
         }
     }
 
@@ -574,6 +650,7 @@ public class PathChildrenCache implements Closeable
             {
                 offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED,
data)));
             }
+            updateInitialSet(ZKPaths.getNodeFromPath(fullPath));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/cd62a3dd/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCacheEvent.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCacheEvent.java
b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCacheEvent.java
index 9a37a6c..c601130 100644
--- a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCacheEvent.java
+++ b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCacheEvent.java
@@ -46,19 +46,25 @@ public class PathChildrenCacheEvent
         CHILD_REMOVED,
 
         /**
-         * Called when the connection has changed to {@link ConnectionState#SUSPENDED}
+         * send when the connection has changed to {@link ConnectionState#SUSPENDED}
          */
         CONNECTION_SUSPENDED,
 
         /**
-         * Called when the connection has changed to {@link ConnectionState#RECONNECTED}
+         * set when the connection has changed to {@link ConnectionState#RECONNECTED}
          */
         CONNECTION_RECONNECTED,
 
         /**
-         * Called when the connection has changed to {@link ConnectionState#LOST}
+         * set when the connection has changed to {@link ConnectionState#LOST}
          */
-        CONNECTION_LOST
+        CONNECTION_LOST,
+
+        /**
+         * Sent when start(StartMode.POST_CHILDREN_INITIALIZED_EVENT) has completed loading
+         * the initial data set
+         */
+        INITIALIZED
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/cd62a3dd/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/RefreshOperation.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/RefreshOperation.java
b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/RefreshOperation.java
index a6988d6..be5bddc 100644
--- a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/RefreshOperation.java
+++ b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/RefreshOperation.java
@@ -19,16 +19,18 @@ package com.netflix.curator.framework.recipes.cache;
 class RefreshOperation implements Operation
 {
     private final PathChildrenCache cache;
+    private final PathChildrenCache.RefreshMode mode;
 
-    RefreshOperation(PathChildrenCache cache)
+    RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode)
     {
         this.cache = cache;
+        this.mode = mode;
     }
 
     @Override
     public void invoke() throws Exception
     {
-        cache.refresh(false);
+        cache.refresh(mode);
     }
 
     @Override
@@ -52,6 +54,6 @@ class RefreshOperation implements Operation
     @Override
     public String toString()
     {
-        return "RefreshOperation{}";
+        return "RefreshOperation(" + mode + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/cd62a3dd/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java
b/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java
index a40bc18..0ab2bcc 100644
--- a/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -23,9 +23,12 @@ import com.netflix.curator.framework.api.UnhandledErrorListener;
 import com.netflix.curator.framework.recipes.BaseClassForTests;
 import com.netflix.curator.retry.RetryOneTime;
 import com.netflix.curator.test.KillSession;
+import com.netflix.curator.test.TestingCluster;
 import com.netflix.curator.test.Timing;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -36,6 +39,60 @@ import java.util.concurrent.atomic.AtomicReference;
 public class TestPathChildrenCache extends BaseClassForTests
 {
     @Test
+    public void     testChildrenInitialized() throws Exception
+    {
+        Timing              timing = new Timing();
+        PathChildrenCache   cache = null;
+        CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(),
timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = new PathChildrenCache(client, "/test", true);
+
+            final CountDownLatch        addedLatch = new CountDownLatch(3);
+            final CountDownLatch        initLatch = new CountDownLatch(1);
+            cache.getListenable().addListener
+            (
+                new PathChildrenCacheListener()
+                {
+                    @Override
+                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception
+                    {
+                        if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+                        {
+                            addedLatch.countDown();
+                        }
+                        else if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED
)
+                        {
+                            initLatch.countDown();
+                        }
+                    }
+                }
+            );
+
+            client.create().forPath("/test/1", "1".getBytes());
+            client.create().forPath("/test/2", "2".getBytes());
+            client.create().forPath("/test/3", "3".getBytes());
+
+            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+
+            Assert.assertTrue(timing.awaitLatch(addedLatch));
+            Assert.assertTrue(timing.awaitLatch(initLatch));
+            Assert.assertEquals(cache.getCurrentData().size(), 3);
+            Assert.assertEquals(cache.getCurrentData().get(0).getData(), "1".getBytes());
+            Assert.assertEquals(cache.getCurrentData().get(1).getData(), "2".getBytes());
+            Assert.assertEquals(cache.getCurrentData().get(2).getData(), "3".getBytes());
+        }
+        finally
+        {
+            Closeables.closeQuietly(cache);
+            Closeables.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void     testUpdateWhenNotCachingData() throws Exception
     {
         Timing              timing = new Timing();
@@ -66,7 +123,7 @@ public class TestPathChildrenCache extends BaseClassForTests
                     }
                 }
             );
-            cache.start(true);
+            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL);
 
             client.create().forPath("/test/foo", "first".getBytes());
             Assert.assertTrue(timing.awaitLatch(addedLatch));
@@ -161,7 +218,7 @@ public class TestPathChildrenCache extends BaseClassForTests
                         }
                     }
                 );
-            cache.start(true);
+            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL);
 
             client.delete().forPath("/test/foo");
             Assert.assertTrue(removedLatch.await(10, TimeUnit.SECONDS));
@@ -260,7 +317,7 @@ public class TestPathChildrenCache extends BaseClassForTests
                     }
                 }
             );
-            cache.start(true);
+            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL);
             future.get();
 
             Assert.assertTrue(addedLatch.await(10, TimeUnit.SECONDS));
@@ -362,7 +419,7 @@ public class TestPathChildrenCache extends BaseClassForTests
                     }
                 }
             );
-            cache.start(true);
+            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL);
 
             client.delete().forPath("/base/a");
             Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS));
@@ -489,7 +546,7 @@ public class TestPathChildrenCache extends BaseClassForTests
                     latch.countDown();
                 }
             };
-            cache.start(true);
+            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL);
 
             latch.await();
 


Mime
View raw message