curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [03/10] git commit: wip
Date Sat, 22 Mar 2014 13:35:18 GMT
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/dd1fe969
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/dd1fe969
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/dd1fe969

Branch: refs/heads/rest
Commit: dd1fe969f725fa84317789e08e3228131105c0f6
Parents: d30a266
Author: randgalt <randgalt@apache.org>
Authored: Wed Jan 8 16:40:25 2014 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Wed Jan 8 16:40:25 2014 -0500

----------------------------------------------------------------------
 .../recipes/cache/PathChildrenCache.java        |  11 ++
 .../curator/x/rest/ConnectionResource.java      |   2 +-
 .../curator/x/rest/PathCacheRecipeResource.java | 118 ++++++++++---
 .../x/rest/entity/PathChildrenCacheEntity.java  |  84 ++++++++++
 .../entity/PathChildrenCacheEventEntity.java    |  84 ++++++++++
 .../curator/x/rest/entity/StatEntity.java       | 167 +++++++++++++++++++
 .../x/rest/system/PathChildrenCacheThing.java   |  59 +++++++
 .../apache/curator/x/rest/system/ThingType.java |  11 +-
 8 files changed, 503 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index d66f7f3..a1844db 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -170,6 +170,17 @@ public class PathChildrenCache implements Closeable
     }
 
     /**
+     * @param client    the client
+     * @param path      path to watch
+     * @param dataIsCompressed if true, data in the path is compressed
+     * @param cacheData if true, node contents are cached in addition to the stat
+     */
+    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean
dataIsCompressed)
+    {
+        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory),
true));
+    }
+
+    /**
      * @param client        the client
      * @param path          path to watch
      * @param cacheData     if true, node contents are cached in addition to the stat

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
index 6d3b559..4f70111 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -48,7 +48,7 @@ public class ConnectionResource
         connectionsManager = contextResolver.getContext(ConnectionsManager.class);
     }
 
-    @POST
+    @GET
     @Path("{id}/block-on-state-change")
     public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse,
@PathParam("id") String id, @QueryParam("state-count") String currentStateCountArg)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
index 1fc7b3d..d953bd6 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
@@ -19,14 +19,22 @@
 
 package org.apache.curator.x.rest;
 
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.rest.entity.LockRequestEntity;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.x.rest.entity.PathChildrenCacheEntity;
+import org.apache.curator.x.rest.entity.PathChildrenCacheEventEntity;
+import org.apache.curator.x.rest.entity.StatEntity;
 import org.apache.curator.x.rest.system.Connection;
 import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.PathChildrenCacheThing;
 import org.apache.curator.x.rest.system.ThingKey;
 import org.apache.curator.x.rest.system.ThingType;
-import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -34,10 +42,13 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.Context;
+import javax.ws.rs.core.GenericEntity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ContextResolver;
-import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Future;
 
 @Path("zookeeper/recipes/path-cache/{connectionId}")
 public class PathCacheRecipeResource
@@ -50,9 +61,8 @@ public class PathCacheRecipeResource
     }
 
     @POST
-    @Path("{path:.*}")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId,
@PathParam("path") String path) throws Exception
+    public Response allocate(@PathParam("connectionId") String connectionId, PathChildrenCacheEntity
spec) throws Exception
     {
         Connection connection = connectionsManager.get(connectionId);
         if ( connection == null )
@@ -60,16 +70,22 @@ public class PathCacheRecipeResource
             return Response.status(Response.Status.NOT_FOUND).build();
         }
 
-        InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(),
path);
-        ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
-        connection.putThing(key, mutex);
+        PathChildrenCache cache = new PathChildrenCache(connection.getClient(), spec.getPath(),
spec.isCacheData(), spec.isDataIsCompressed());
+        PathChildrenCacheThing cacheThing = new PathChildrenCacheThing(cache);
+        cache.getListenable().addListener(new LocalListener(cacheThing));
+
+        ThingKey<PathChildrenCacheThing> key = new ThingKey<PathChildrenCacheThing>(ThingType.PATH_CACHE);
+        connection.putThing(key, cacheThing);
+
+        PathChildrenCache.StartMode startMode = spec.isBuildInitial() ? PathChildrenCache.StartMode.POST_INITIALIZED_EVENT
: PathChildrenCache.StartMode.NORMAL;
+        cache.start(startMode);
 
         return Response.ok(key.getId()).build();
     }
 
     @DELETE
     @Path("{id}")
-    public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id")
String lockId) throws Exception
+    public Response delete(@PathParam("connectionId") String connectionId, @PathParam("id")
String cacheId) throws IOException
     {
         Connection connection = connectionsManager.get(connectionId);
         if ( connection == null )
@@ -77,23 +93,20 @@ public class PathCacheRecipeResource
             return Response.status(Response.Status.NOT_FOUND).build();
         }
 
-        InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId,
ThingType.MUTEX));
-        if ( mutex == null )
+        PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId,
ThingType.PATH_CACHE));
+        if ( cacheThing == null )
         {
             return Response.status(Response.Status.NOT_FOUND).build();
         }
 
-        if ( mutex.isAcquiredInThisProcess() )
-        {
-            mutex.release();
-        }
+        cacheThing.getCache().close();
 
         return Response.ok().build();
     }
 
-    @POST
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId")
String connectionId, final LockRequestEntity lockRequest) throws Exception
+    @GET
+    @Path("{id}/block-on-events")
+    public void getEvents(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId")
String connectionId, @PathParam("id") String cacheId)
     {
         Connection connection = connectionsManager.get(connectionId);
         if ( connection == null )
@@ -102,14 +115,14 @@ public class PathCacheRecipeResource
             return;
         }
 
-        final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(),
ThingType.MUTEX));
-        if ( mutex == null )
+        final PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId,
ThingType.PATH_CACHE));
+        if ( cacheThing == null )
         {
             asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
             return;
         }
 
-        connectionsManager.getExecutorService().submit
+        Future<?> future = connectionsManager.getExecutorService().submit
         (
             new Runnable()
             {
@@ -118,15 +131,68 @@ public class PathCacheRecipeResource
                 {
                     try
                     {
-                        boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
-                        asyncResponse.resume(Response.status(success ? Response.Status.OK
: Response.Status.REQUEST_TIMEOUT).build());
+                        List<PathChildrenCacheEvent> events = cacheThing.blockForPendingEvents();
+                        List<PathChildrenCacheEventEntity> transformed = Lists.transform(events,
toEntity);
+                        GenericEntity<List<PathChildrenCacheEventEntity>> entity
= new GenericEntity<List<PathChildrenCacheEventEntity>>(transformed){};
+                        asyncResponse.resume(Response.ok(entity).build());
                     }
-                    catch ( Exception e )
+                    catch ( InterruptedException e )
                     {
-                        asyncResponse.resume(e);
+                        Thread.currentThread().interrupt();
+                        asyncResponse.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build());
                     }
                 }
             }
         );
+        connection.putThing(new ThingKey<Future>(ThingType.FUTURE), future);
+    }
+
+    private static final Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity>
toEntity = new Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity>()
+    {
+        @Override
+        public PathChildrenCacheEventEntity apply(PathChildrenCacheEvent event)
+        {
+            String path = (event.getData() != null) ? event.getData().getPath() : null;
+            String data = ((event.getData() != null) && (event.getData().getData()
!= null)) ? new String((event.getData().getData())) : null;
+            StatEntity stat = ((event.getData() != null) && (event.getData().getStat()
!= null))
+                ? new StatEntity
+                (
+                    event.getData().getStat().getCzxid(),
+                    event.getData().getStat().getMzxid(),
+                    event.getData().getStat().getCtime(),
+                    event.getData().getStat().getMtime(),
+                    event.getData().getStat().getVersion(),
+                    event.getData().getStat().getCversion(),
+                    event.getData().getStat().getAversion(),
+                    event.getData().getStat().getEphemeralOwner(),
+                    event.getData().getStat().getDataLength(),
+                    event.getData().getStat().getNumChildren(),
+                    event.getData().getStat().getPzxid()
+                )
+                : null;
+            return new PathChildrenCacheEventEntity
+            (
+                event.getType().name(),
+                path,
+                data,
+                stat
+            );
+        }
+    };
+
+    private static class LocalListener implements PathChildrenCacheListener
+    {
+        private final PathChildrenCacheThing cacheThing;
+
+        public LocalListener(PathChildrenCacheThing cacheThing)
+        {
+            this.cacheThing = cacheThing;
+        }
+
+        @Override
+        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws
Exception
+        {
+            cacheThing.addEvent(event);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
new file mode 100644
index 0000000..c2c3328
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class PathChildrenCacheEntity
+{
+    private String path;
+    private boolean dataIsCompressed;
+    private boolean cacheData;
+    private boolean buildInitial;
+
+    public PathChildrenCacheEntity()
+    {
+        this("", false, false, false);
+    }
+
+    public PathChildrenCacheEntity(String path, boolean dataIsCompressed, boolean cacheData,
boolean buildInitial)
+    {
+        this.path = path;
+        this.dataIsCompressed = dataIsCompressed;
+        this.cacheData = cacheData;
+        this.buildInitial = buildInitial;
+    }
+
+    public String getPath()
+    {
+        return path;
+    }
+
+    public void setPath(String path)
+    {
+        this.path = path;
+    }
+
+    public boolean isDataIsCompressed()
+    {
+        return dataIsCompressed;
+    }
+
+    public void setDataIsCompressed(boolean dataIsCompressed)
+    {
+        this.dataIsCompressed = dataIsCompressed;
+    }
+
+    public boolean isCacheData()
+    {
+        return cacheData;
+    }
+
+    public void setCacheData(boolean cacheData)
+    {
+        this.cacheData = cacheData;
+    }
+
+    public boolean isBuildInitial()
+    {
+        return buildInitial;
+    }
+
+    public void setBuildInitial(boolean buildInitial)
+    {
+        this.buildInitial = buildInitial;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
new file mode 100644
index 0000000..9b08d32
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class PathChildrenCacheEventEntity
+{
+    private String eventType;
+    private String path;
+    private String data;
+    private StatEntity stat;
+
+    public PathChildrenCacheEventEntity()
+    {
+        this("", "", "", new StatEntity());
+    }
+
+    public PathChildrenCacheEventEntity(String eventType, String path, String data, StatEntity
stat)
+    {
+        this.eventType = eventType;
+        this.path = path;
+        this.data = data;
+        this.stat = stat;
+    }
+
+    public String getEventType()
+    {
+        return eventType;
+    }
+
+    public void setEventType(String eventType)
+    {
+        this.eventType = eventType;
+    }
+
+    public String getPath()
+    {
+        return path;
+    }
+
+    public void setPath(String path)
+    {
+        this.path = path;
+    }
+
+    public String getData()
+    {
+        return data;
+    }
+
+    public void setData(String data)
+    {
+        this.data = data;
+    }
+
+    public StatEntity getStat()
+    {
+        return stat;
+    }
+
+    public void setStat(StatEntity stat)
+    {
+        this.stat = stat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
new file mode 100644
index 0000000..ef323ec
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class StatEntity
+{
+    private long czxid;
+    private long mzxid;
+    private long ctime;
+    private long mtime;
+    private int version;
+    private int cversion;
+    private int aversion;
+    private long ephemeralOwner;
+    private int dataLength;
+    private int numChildren;
+    private long pzxid;
+
+    public StatEntity()
+    {
+    }
+
+    public StatEntity(long czxid, long mzxid, long ctime, long mtime, int version, int cversion,
int aversion, long ephemeralOwner, int dataLength, int numChildren, long pzxid)
+    {
+        this.czxid = czxid;
+        this.mzxid = mzxid;
+        this.ctime = ctime;
+        this.mtime = mtime;
+        this.version = version;
+        this.cversion = cversion;
+        this.aversion = aversion;
+        this.ephemeralOwner = ephemeralOwner;
+        this.dataLength = dataLength;
+        this.numChildren = numChildren;
+        this.pzxid = pzxid;
+    }
+
+    public long getCzxid()
+    {
+        return czxid;
+    }
+
+    public void setCzxid(long czxid)
+    {
+        this.czxid = czxid;
+    }
+
+    public long getMzxid()
+    {
+        return mzxid;
+    }
+
+    public void setMzxid(long mzxid)
+    {
+        this.mzxid = mzxid;
+    }
+
+    public long getCtime()
+    {
+        return ctime;
+    }
+
+    public void setCtime(long ctime)
+    {
+        this.ctime = ctime;
+    }
+
+    public long getMtime()
+    {
+        return mtime;
+    }
+
+    public void setMtime(long mtime)
+    {
+        this.mtime = mtime;
+    }
+
+    public int getVersion()
+    {
+        return version;
+    }
+
+    public void setVersion(int version)
+    {
+        this.version = version;
+    }
+
+    public int getCversion()
+    {
+        return cversion;
+    }
+
+    public void setCversion(int cversion)
+    {
+        this.cversion = cversion;
+    }
+
+    public int getAversion()
+    {
+        return aversion;
+    }
+
+    public void setAversion(int aversion)
+    {
+        this.aversion = aversion;
+    }
+
+    public long getEphemeralOwner()
+    {
+        return ephemeralOwner;
+    }
+
+    public void setEphemeralOwner(long ephemeralOwner)
+    {
+        this.ephemeralOwner = ephemeralOwner;
+    }
+
+    public int getDataLength()
+    {
+        return dataLength;
+    }
+
+    public void setDataLength(int dataLength)
+    {
+        this.dataLength = dataLength;
+    }
+
+    public int getNumChildren()
+    {
+        return numChildren;
+    }
+
+    public void setNumChildren(int numChildren)
+    {
+        this.numChildren = numChildren;
+    }
+
+    public long getPzxid()
+    {
+        return pzxid;
+    }
+
+    public void setPzxid(long pzxid)
+    {
+        this.pzxid = pzxid;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
new file mode 100644
index 0000000..1c507aa
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import java.util.List;
+
+public class PathChildrenCacheThing
+{
+    private final PathChildrenCache cache;
+    private final List<PathChildrenCacheEvent> events = Lists.newLinkedList();
+
+    public PathChildrenCacheThing(PathChildrenCache cache)
+    {
+        this.cache = cache;
+    }
+
+    public PathChildrenCache getCache()
+    {
+        return cache;
+    }
+
+    public synchronized List<PathChildrenCacheEvent> blockForPendingEvents() throws
InterruptedException
+    {
+        while ( events.size() == 0 )
+        {
+            wait();
+        }
+
+        List<PathChildrenCacheEvent> localEvents = Lists.newArrayList(events);
+        events.clear();
+        return localEvents;
+    }
+
+    public synchronized void addEvent(PathChildrenCacheEvent event)
+    {
+        events.add(event);
+        notifyAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
index 13448dd..81251fc 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
@@ -20,7 +20,6 @@
 package org.apache.curator.x.rest.system;
 
 import com.google.common.io.Closeables;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import java.util.concurrent.Future;
 
@@ -41,18 +40,18 @@ public interface ThingType<T>
         }
     };
 
-    public static ThingType<PathChildrenCache> PATH_CACHE = new ThingType<PathChildrenCache>()
+    public static ThingType<PathChildrenCacheThing> PATH_CACHE = new ThingType<PathChildrenCacheThing>()
     {
         @Override
-        public Class<PathChildrenCache> getThingClass()
+        public Class<PathChildrenCacheThing> getThingClass()
         {
-            return PathChildrenCache.class;
+            return PathChildrenCacheThing.class;
         }
 
         @Override
-        public void closeFor(PathChildrenCache cache)
+        public void closeFor(PathChildrenCacheThing cache)
         {
-            Closeables.closeQuietly(cache);
+            Closeables.closeQuietly(cache.getCache());
         }
     };
 


Mime
View raw message