curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [02/10] git commit: wip
Date Sat, 22 Mar 2014 13:35:17 GMT
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d30a2664
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d30a2664
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d30a2664

Branch: refs/heads/rest
Commit: d30a2664596dbc4abc94929e3d98b3301a12b2e7
Parents: 91fb388
Author: randgalt <randgalt@apache.org>
Authored: Wed Jan 8 15:41:38 2014 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Wed Jan 8 15:41:38 2014 -0500

----------------------------------------------------------------------
 .../curator/x/rest/ConnectionResource.java      |  13 +-
 .../curator/x/rest/LeaderRecipeResource.java    | 132 -------------------
 .../curator/x/rest/PathCacheRecipeResource.java | 132 +++++++++++++++++++
 .../x/rest/entity/ConnectionStateEntity.java    |  60 +++++++++
 .../curator/x/rest/system/Connection.java       |  37 +++++-
 .../apache/curator/x/rest/system/ThingType.java |  12 +-
 6 files changed, 236 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
index 88fd202..6d3b559 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -19,7 +19,6 @@
 
 package org.apache.curator.x.rest;
 
-import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.x.rest.system.Connection;
 import org.apache.curator.x.rest.system.ConnectionsManager;
 import org.apache.curator.x.rest.system.ThingKey;
@@ -30,6 +29,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.Context;
@@ -49,8 +49,8 @@ public class ConnectionResource
     }
 
     @POST
-    @Path("{id}/connection-state-change")
-    public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse,
@PathParam("id") String id)
+    @Path("{id}/block-on-state-change")
+    public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse,
@PathParam("id") String id, @QueryParam("state-count") String currentStateCountArg)
     {
         final Connection connection = connectionsManager.get(id);
         if ( connection == null )
@@ -59,6 +59,7 @@ public class ConnectionResource
             return;
         }
 
+        final long currentStateCount = (currentStateCountArg != null) ? Long.parseLong(currentStateCountArg)
: -1;
         Future<?> future = connectionsManager.getExecutorService().submit(new Runnable()
         {
             @Override
@@ -66,8 +67,8 @@ public class ConnectionResource
             {
                 try
                 {
-                    ConnectionState state = connection.blockingPopStateChange();
-                    asyncResponse.resume(Response.ok(state.name()).build());
+                    connection.blockUntilStateChange(currentStateCount);
+                    asyncResponse.resume(Response.ok().build());
                 }
                 catch ( InterruptedException e )
                 {
@@ -90,7 +91,7 @@ public class ConnectionResource
             return Response.status(Response.Status.NOT_FOUND).build();
         }
 
-        return Response.ok(connection.getClient().getState().name()).build();
+        return Response.ok(connection.getState()).build();
     }
 
     @POST

http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
deleted file mode 100644
index 868fb33..0000000
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.x.rest;
-
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.rest.entity.LockRequestEntity;
-import org.apache.curator.x.rest.system.Connection;
-import org.apache.curator.x.rest.system.ConnectionsManager;
-import org.apache.curator.x.rest.system.ThingKey;
-import org.apache.curator.x.rest.system.ThingType;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.ext.ContextResolver;
-import java.util.concurrent.TimeUnit;
-
-@Path("zookeeper/recipes/leader/{connectionId}")
-public class LeaderRecipeResource
-{
-    private final ConnectionsManager connectionsManager;
-
-    public LeaderRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver)
-    {
-        connectionsManager = contextResolver.getContext(ConnectionsManager.class);
-    }
-
-    @POST
-    @Path("{path:.*}")
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId,
@PathParam("path") String path) throws Exception
-    {
-        Connection connection = connectionsManager.get(connectionId);
-        if ( connection == null )
-        {
-            return Response.status(Response.Status.NOT_FOUND).build();
-        }
-
-        InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(),
path);
-        ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
-        connection.putThing(key, mutex);
-
-        return Response.ok(key.getId()).build();
-    }
-
-    @DELETE
-    @Path("{id}")
-    public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id")
String lockId) throws Exception
-    {
-        Connection connection = connectionsManager.get(connectionId);
-        if ( connection == null )
-        {
-            return Response.status(Response.Status.NOT_FOUND).build();
-        }
-
-        InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId,
ThingType.MUTEX));
-        if ( mutex == null )
-        {
-            return Response.status(Response.Status.NOT_FOUND).build();
-        }
-
-        if ( mutex.isAcquiredInThisProcess() )
-        {
-            mutex.release();
-        }
-
-        return Response.ok().build();
-    }
-
-    @POST
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId")
String connectionId, final LockRequestEntity lockRequest) throws Exception
-    {
-        Connection connection = connectionsManager.get(connectionId);
-        if ( connection == null )
-        {
-            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
-            return;
-        }
-
-        final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(),
ThingType.MUTEX));
-        if ( mutex == null )
-        {
-            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
-            return;
-        }
-
-        connectionsManager.getExecutorService().submit
-        (
-            new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    try
-                    {
-                        boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
-                        asyncResponse.resume(Response.status(success ? Response.Status.OK
: Response.Status.REQUEST_TIMEOUT).build());
-                    }
-                    catch ( Exception e )
-                    {
-                        asyncResponse.resume(e);
-                    }
-                }
-            }
-        );
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
new file mode 100644
index 0000000..1fc7b3d
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.x.rest.entity.LockRequestEntity;
+import org.apache.curator.x.rest.system.Connection;
+import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.ThingKey;
+import org.apache.curator.x.rest.system.ThingType;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ContextResolver;
+import java.util.concurrent.TimeUnit;
+
+@Path("zookeeper/recipes/path-cache/{connectionId}")
+public class PathCacheRecipeResource
+{
+    private final ConnectionsManager connectionsManager;
+
+    public PathCacheRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver)
+    {
+        connectionsManager = contextResolver.getContext(ConnectionsManager.class);
+    }
+
+    @POST
+    @Path("{path:.*}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId,
@PathParam("path") String path) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(),
path);
+        ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
+        connection.putThing(key, mutex);
+
+        return Response.ok(key.getId()).build();
+    }
+
+    @DELETE
+    @Path("{id}")
+    public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id")
String lockId) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId,
ThingType.MUTEX));
+        if ( mutex == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        if ( mutex.isAcquiredInThisProcess() )
+        {
+            mutex.release();
+        }
+
+        return Response.ok().build();
+    }
+
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId")
String connectionId, final LockRequestEntity lockRequest) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+            return;
+        }
+
+        final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(),
ThingType.MUTEX));
+        if ( mutex == null )
+        {
+            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+            return;
+        }
+
+        connectionsManager.getExecutorService().submit
+        (
+            new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
+                        asyncResponse.resume(Response.status(success ? Response.Status.OK
: Response.Status.REQUEST_TIMEOUT).build());
+                    }
+                    catch ( Exception e )
+                    {
+                        asyncResponse.resume(e);
+                    }
+                }
+            }
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java
new file mode 100644
index 0000000..797c26e
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class ConnectionStateEntity
+{
+    private String state;
+    private long stateCount;
+
+    public ConnectionStateEntity()
+    {
+        this("", -1);
+    }
+
+    public ConnectionStateEntity(String state, long stateCount)
+    {
+        this.state = state;
+        this.stateCount = stateCount;
+    }
+
+    public String getState()
+    {
+        return state;
+    }
+
+    public void setState(String state)
+    {
+        this.state = state;
+    }
+
+    public long getStateCount()
+    {
+        return stateCount;
+    }
+
+    public void setStateCount(long stateCount)
+    {
+        this.stateCount = stateCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
index 7f46de1..7ef36dd 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
@@ -19,14 +19,14 @@
 
 package org.apache.curator.x.rest.system;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.x.rest.entity.ConnectionStateEntity;
 import java.io.Closeable;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class Connection implements Closeable, ConnectionStateListener
@@ -34,7 +34,8 @@ public class Connection implements Closeable, ConnectionStateListener
     private final CuratorFramework client;
     private final AtomicLong lastUseMs = new AtomicLong(System.currentTimeMillis());
     private final Map<ThingKey, Object> things = Maps.newConcurrentMap();
-    private final BlockingQueue<ConnectionState> states = Queues.newLinkedBlockingQueue();
+    private final Object stateLock = new Object();
+    private long stateCount = 0;  // guarded by stateLock
 
     public Connection(CuratorFramework client)
     {
@@ -51,12 +52,35 @@ public class Connection implements Closeable, ConnectionStateListener
     @Override
     public void stateChanged(CuratorFramework client, ConnectionState newState)
     {
-        states.add(newState);
+        synchronized(stateLock)
+        {
+            ++stateCount;
+            stateLock.notifyAll();
+        }
+    }
+
+    public ConnectionStateEntity getState()
+    {
+        synchronized(stateLock)
+        {
+            return new ConnectionStateEntity(client.getState().name(), stateCount);
+        }
     }
 
-    public ConnectionState blockingPopStateChange() throws InterruptedException
+    public void blockUntilStateChange(long expectedStateCount) throws InterruptedException
     {
-        return states.take();
+        synchronized(stateLock)
+        {
+            if ( expectedStateCount < 0 )
+            {
+                expectedStateCount = stateCount;
+            }
+
+            while ( stateCount == expectedStateCount )
+            {
+                stateLock.wait();
+            }
+        }
     }
 
     public void updateUse()
@@ -81,6 +105,7 @@ public class Connection implements Closeable, ConnectionStateListener
 
     public <T> void putThing(ThingKey<T> key, T thing)
     {
+        thing = Preconditions.checkNotNull(thing, "thing cannot be null");
         things.put(key, thing);
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
index d09f0cc..13448dd 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
@@ -20,7 +20,7 @@
 package org.apache.curator.x.rest.system;
 
 import com.google.common.io.Closeables;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import java.util.concurrent.Future;
 
@@ -41,18 +41,18 @@ public interface ThingType<T>
         }
     };
 
-    public static ThingType<LeaderLatch> LEADER = new ThingType<LeaderLatch>()
+    public static ThingType<PathChildrenCache> PATH_CACHE = new ThingType<PathChildrenCache>()
     {
         @Override
-        public Class<LeaderLatch> getThingClass()
+        public Class<PathChildrenCache> getThingClass()
         {
-            return LeaderLatch.class;
+            return PathChildrenCache.class;
         }
 
         @Override
-        public void closeFor(LeaderLatch latch)
+        public void closeFor(PathChildrenCache cache)
         {
-            Closeables.closeQuietly(latch);
+            Closeables.closeQuietly(cache);
         }
     };
 


Mime
View raw message