curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [1/2] git commit: wip
Date Thu, 29 May 2014 01:35:11 GMT
Repository: curator
Updated Branches:
  refs/heads/curator-rpc 598ad996c -> 530010d90


wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e5ecbcd9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e5ecbcd9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e5ecbcd9

Branch: refs/heads/curator-rpc
Commit: e5ecbcd96fd84611a11a34074d63ae05a77df0d9
Parents: 598ad99
Author: randgalt <randgalt@apache.org>
Authored: Wed May 28 13:11:49 2014 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Wed May 28 13:11:49 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/curator/x/rpc/Closer.java   |   6 -
 .../org/apache/curator/x/rpc/CuratorEntry.java  | 127 -------------------
 .../curator/x/rpc/CuratorProjectionServer.java  |  12 +-
 .../org/apache/curator/x/rpc/RpcManager.java    |  66 ----------
 .../x/rpc/configuration/Configuration.java      |   3 +-
 .../configuration/ConnectionConfiguration.java  |  27 +++-
 .../curator/x/rpc/connections/Closer.java       |   6 +
 .../x/rpc/connections/ConnectionManager.java    | 115 +++++++++++++++++
 .../curator/x/rpc/connections/CuratorEntry.java | 127 +++++++++++++++++++
 .../curator/x/rpc/idl/event/EventService.java   |  12 +-
 .../projection/CuratorProjectionService.java    |  20 +--
 11 files changed, 299 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java
deleted file mode 100644
index a389776..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.curator.x.rpc;
-
-public interface Closer<T>
-{
-    public void close(T thing);
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
deleted file mode 100644
index 52f025d..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.curator.x.rpc;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class CuratorEntry implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
-    private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue();
-    private final AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
-    private final Map<String, Entry> things = Maps.newConcurrentMap();
-
-    private static class Entry
-    {
-        final Object thing;
-        final Closer closer;
-
-        private Entry(Object thing, Closer closer)
-        {
-            this.thing = thing;
-            this.closer = closer;
-        }
-    }
-
-    private enum State
-    {
-        OPEN,
-        CLOSED
-    }
-
-    public CuratorEntry(CuratorFramework client)
-    {
-        this.client = client;
-    }
-
-    @Override
-    public void close()
-    {
-        if ( state.compareAndSet(State.OPEN, State.CLOSED) )
-        {
-            for ( Map.Entry<String, Entry> mapEntry : things.entrySet() )
-            {
-                Entry entry = mapEntry.getValue();
-                if ( entry.closer != null )
-                {
-                    log.debug(String.format("Closing left over thing. Type: %s - Id: %s",
entry.thing.getClass(), mapEntry.getKey()));
-                    //noinspection unchecked
-                    entry.closer.close(entry.thing);    // lack of generics is safe because
addThing() is type-safe
-                }
-            }
-            things.clear();
-
-            client.close();
-            events.clear();
-        }
-    }
-
-    public RpcCuratorEvent pollForEvent(long maxWaitMs) throws InterruptedException
-    {
-        if ( state.get() == State.OPEN )
-        {
-            return events.poll(maxWaitMs, TimeUnit.MILLISECONDS);
-        }
-        return null;
-    }
-
-    public void addEvent(RpcCuratorEvent event)
-    {
-        if ( state.get() == State.OPEN )
-        {
-            events.offer(event);
-        }
-    }
-
-    public CuratorFramework getClient()
-    {
-        return (state.get() == State.OPEN) ? client : null;
-    }
-
-    public <T> String addThing(T thing, Closer<T> closer)
-    {
-        return addThing(UUID.randomUUID().toString(), thing, closer);
-    }
-
-    public <T> String addThing(String id, T thing, Closer<T> closer)
-    {
-        things.put(id, new Entry(thing, closer));
-        return id;
-    }
-
-    public <T> T getThing(String id, Class<T> clazz)
-    {
-        Entry entry = (id != null) ? things.get(id) : null;
-        return cast(clazz, entry);
-    }
-
-    public boolean closeThing(String id)
-    {
-        Entry entry = (id != null) ? things.remove(id) : null;
-        if ( entry != null )
-        {
-            //noinspection unchecked
-            entry.closer.close(entry.thing);
-        }
-        return false;
-    }
-
-    private <T> T cast(Class<T> clazz, Entry entry)
-    {
-        if ( entry != null )
-        {
-            return clazz.cast(entry.thing);
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index 2dcb99e..e8ebfc5 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -29,6 +29,7 @@ import com.google.common.io.Files;
 import com.google.common.io.Resources;
 import org.apache.curator.x.rpc.configuration.Configuration;
 import org.apache.curator.x.rpc.configuration.ConfigurationBuilder;
+import org.apache.curator.x.rpc.connections.ConnectionManager;
 import org.apache.curator.x.rpc.idl.event.EventService;
 import org.apache.curator.x.rpc.idl.projection.CuratorProjectionService;
 import org.slf4j.Logger;
@@ -42,7 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class CuratorProjectionServer
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final RpcManager rpcManager;
+    private final ConnectionManager connectionManager;
     private final ThriftServer server;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final Configuration configuration;
@@ -94,9 +95,9 @@ public class CuratorProjectionServer
     public CuratorProjectionServer(Configuration configuration)
     {
         this.configuration = configuration;
-        rpcManager = new RpcManager(configuration.getProjectionExpiration().toMillis());
-        EventService eventService = new EventService(rpcManager, configuration.getPingTime().toMillis());
-        CuratorProjectionService projectionService = new CuratorProjectionService(rpcManager);
+        connectionManager = new ConnectionManager(configuration.getConnections(), configuration.getProjectionExpiration().toMillis());
+        EventService eventService = new EventService(connectionManager, configuration.getPingTime().toMillis());
+        CuratorProjectionService projectionService = new CuratorProjectionService(connectionManager);
         ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(),
Lists.<ThriftEventHandler>newArrayList(), projectionService, eventService);
         server = new ThriftServer(processor, configuration.getThrift());
     }
@@ -106,6 +107,7 @@ public class CuratorProjectionServer
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already
started");
 
         configuration.getLogging().configure(new MetricRegistry(), "curator-rpc");
+        connectionManager.start();
         server.start();
 
         log.info("Server listening on port: " + configuration.getThrift().getPort());
@@ -117,8 +119,8 @@ public class CuratorProjectionServer
         {
             log.info("Stopping...");
 
-            rpcManager.close();
             server.close();
+            connectionManager.close();
             configuration.getLogging().stop();
 
             log.info("Stopped");

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
deleted file mode 100644
index e238aa6..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.curator.x.rpc;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
-
-public class RpcManager implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final Cache<String, CuratorEntry> cache;
-
-    public RpcManager(long expirationMs)
-    {
-        RemovalListener<String, CuratorEntry> listener = new RemovalListener<String,
CuratorEntry>()
-        {
-            @SuppressWarnings("NullableProblems")
-            @Override
-            public void onRemoval(RemovalNotification<String, CuratorEntry> notification)
-            {
-                if ( notification != null )
-                {
-                    log.debug(String.format("Entry being removed. id (%s), reason (%s)",
notification.getKey(), notification.getCause()));
-
-                    CuratorEntry entry = notification.getValue();
-                    if ( entry != null )
-                    {
-                        entry.close();
-                    }
-                }
-            }
-        };
-        cache = CacheBuilder
-            .newBuilder()
-            .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS)
-            .removalListener(listener)
-            .build();
-    }
-
-    @Override
-    public void close()
-    {
-        cache.invalidateAll();
-        cache.cleanUp();
-    }
-
-    public void add(String id, CuratorFramework client)
-    {
-        cache.put(id, new CuratorEntry(client));
-    }
-
-    public CuratorEntry get(String id)
-    {
-        return cache.getIfPresent(id);
-    }
-
-    public CuratorEntry remove(String id)
-    {
-        return cache.asMap().remove(id);
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
index e567cac..6d567b7 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
@@ -1,6 +1,7 @@
 package org.apache.curator.x.rpc.configuration;
 
 import com.facebook.swift.service.ThriftServerConfig;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import io.airlift.units.Duration;
 import io.dropwizard.logging.LoggingFactory;
@@ -57,7 +58,7 @@ public class Configuration
 
     public List<ConnectionConfiguration> getConnections()
     {
-        return connections;
+        return ImmutableList.copyOf(connections);
     }
 
     public void setConnections(List<ConnectionConfiguration> connections)

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
index 3e2f9a2..51f12fa 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
@@ -1,13 +1,16 @@
 package org.apache.curator.x.rpc.configuration;
 
+import com.google.common.base.Preconditions;
 import io.airlift.units.Duration;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
 import javax.validation.constraints.NotNull;
 import java.util.concurrent.TimeUnit;
 
 public class ConnectionConfiguration
 {
     @NotNull private String name;
-    private String connectionString = "localhost:2181";
+    private String connectionString = null;
     private Duration sessionLength = new Duration(1, TimeUnit.MINUTES);
     private Duration connectionTimeout = new Duration(15, TimeUnit.SECONDS);
     private AuthorizationConfiguration authorization = null;
@@ -83,4 +86,26 @@ public class ConnectionConfiguration
     {
         this.retry = retry;
     }
+
+    public CuratorFramework build()
+    {
+        Preconditions.checkState((connectionString != null) && (connectionString.length()
> 0), "You must specify a connection string for connection: " + name);
+        Preconditions.checkNotNull(retry, "retry cannot be null");
+
+        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+        builder = builder
+            .connectString(connectionString)
+            .sessionTimeoutMs((int)sessionLength.toMillis())
+            .connectionTimeoutMs((int)connectionTimeout.toMillis())
+            .retryPolicy(retry.build());
+        if ( authorization != null )
+        {
+            builder = builder.authorization(authorization.getScheme(), authorization.getAuth().getBytes());
+        }
+        if ( namespace != null )
+        {
+            builder = builder.namespace(namespace);
+        }
+        return builder.build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java
new file mode 100644
index 0000000..060c7d7
--- /dev/null
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java
@@ -0,0 +1,6 @@
+package org.apache.curator.x.rpc.connections;
+
+public interface Closer<T>
+{
+    public void close(T thing);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
new file mode 100644
index 0000000..b101c6d
--- /dev/null
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
@@ -0,0 +1,115 @@
+package org.apache.curator.x.rpc.connections;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableMap;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.x.rpc.configuration.ConnectionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ConnectionManager implements Closeable
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final Cache<String, CuratorEntry> cache;
+    private final Map<String, CuratorFramework> clients;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    public ConnectionManager(List<ConnectionConfiguration> connections, long expirationMs)
+    {
+        RemovalListener<String, CuratorEntry> listener = new RemovalListener<String,
CuratorEntry>()
+        {
+            @SuppressWarnings("NullableProblems")
+            @Override
+            public void onRemoval(RemovalNotification<String, CuratorEntry> notification)
+            {
+                if ( notification != null )
+                {
+                    log.debug(String.format("Entry being removed. id (%s), reason (%s)",
notification.getKey(), notification.getCause()));
+
+                    CuratorEntry entry = notification.getValue();
+                    if ( entry != null )
+                    {
+                        entry.close();
+                    }
+                }
+            }
+        };
+        cache = CacheBuilder
+            .newBuilder()
+            .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS)
+            .removalListener(listener)
+            .build();
+
+        clients = buildClients(connections);
+    }
+
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already
started");
+        for ( CuratorFramework client : clients.values() )
+        {
+            client.start();
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            cache.invalidateAll();
+            cache.cleanUp();
+
+            for ( CuratorFramework client : clients.values() )
+            {
+                client.close();
+            }
+        }
+    }
+
+    public void add(String id, CuratorFramework client)
+    {
+        Preconditions.checkState(state.get() == State.STARTED, "Not started");
+        cache.put(id, new CuratorEntry(client));
+    }
+
+    public CuratorEntry get(String id)
+    {
+        Preconditions.checkState(state.get() == State.STARTED, "Not started");
+        return cache.getIfPresent(id);
+    }
+
+    public CuratorEntry remove(String id)
+    {
+        Preconditions.checkState(state.get() == State.STARTED, "Not started");
+        return cache.asMap().remove(id);
+    }
+
+    private Map<String, CuratorFramework> buildClients(List<ConnectionConfiguration>
connections)
+    {
+        Preconditions.checkArgument(connections.size() > 0, "You must have at least one
connection configured");
+
+        ImmutableMap.Builder<String, CuratorFramework> builder = ImmutableMap.builder();
+        for ( ConnectionConfiguration configuration : connections )
+        {
+            builder.put(configuration.getName(), configuration.build());
+        }
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
new file mode 100644
index 0000000..0c4bb8b
--- /dev/null
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
@@ -0,0 +1,127 @@
+package org.apache.curator.x.rpc.connections;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CuratorEntry implements Closeable
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue();
+    private final AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
+    private final Map<String, Entry> things = Maps.newConcurrentMap();
+
+    private static class Entry
+    {
+        final Object thing;
+        final Closer closer;
+
+        private Entry(Object thing, Closer closer)
+        {
+            this.thing = thing;
+            this.closer = closer;
+        }
+    }
+
+    private enum State
+    {
+        OPEN,
+        CLOSED
+    }
+
+    public CuratorEntry(CuratorFramework client)
+    {
+        this.client = client;
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.OPEN, State.CLOSED) )
+        {
+            for ( Map.Entry<String, Entry> mapEntry : things.entrySet() )
+            {
+                Entry entry = mapEntry.getValue();
+                if ( entry.closer != null )
+                {
+                    log.debug(String.format("Closing left over thing. Type: %s - Id: %s",
entry.thing.getClass(), mapEntry.getKey()));
+                    //noinspection unchecked
+                    entry.closer.close(entry.thing);    // lack of generics is safe because
addThing() is type-safe
+                }
+            }
+            things.clear();
+
+            client.close();
+            events.clear();
+        }
+    }
+
+    public RpcCuratorEvent pollForEvent(long maxWaitMs) throws InterruptedException
+    {
+        if ( state.get() == State.OPEN )
+        {
+            return events.poll(maxWaitMs, TimeUnit.MILLISECONDS);
+        }
+        return null;
+    }
+
+    public void addEvent(RpcCuratorEvent event)
+    {
+        if ( state.get() == State.OPEN )
+        {
+            events.offer(event);
+        }
+    }
+
+    public CuratorFramework getClient()
+    {
+        return (state.get() == State.OPEN) ? client : null;
+    }
+
+    public <T> String addThing(T thing, Closer<T> closer)
+    {
+        return addThing(UUID.randomUUID().toString(), thing, closer);
+    }
+
+    public <T> String addThing(String id, T thing, Closer<T> closer)
+    {
+        things.put(id, new Entry(thing, closer));
+        return id;
+    }
+
+    public <T> T getThing(String id, Class<T> clazz)
+    {
+        Entry entry = (id != null) ? things.get(id) : null;
+        return cast(clazz, entry);
+    }
+
+    public boolean closeThing(String id)
+    {
+        Entry entry = (id != null) ? things.remove(id) : null;
+        if ( entry != null )
+        {
+            //noinspection unchecked
+            entry.closer.close(entry.thing);
+        }
+        return false;
+    }
+
+    private <T> T cast(Class<T> clazz, Entry entry)
+    {
+        if ( entry != null )
+        {
+            return clazz.cast(entry.thing);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
index 13598c8..933f3af 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
@@ -2,26 +2,26 @@ package org.apache.curator.x.rpc.idl.event;
 
 import com.facebook.swift.service.ThriftMethod;
 import com.facebook.swift.service.ThriftService;
-import org.apache.curator.x.rpc.CuratorEntry;
-import org.apache.curator.x.rpc.RpcManager;
+import org.apache.curator.x.rpc.connections.CuratorEntry;
+import org.apache.curator.x.rpc.connections.ConnectionManager;
 import org.apache.curator.x.rpc.idl.projection.CuratorProjection;
 
 @ThriftService("EventService")
 public class EventService
 {
-    private final RpcManager rpcManager;
+    private final ConnectionManager connectionManager;
     private final long pingTimeMs;
 
-    public EventService(RpcManager rpcManager, long pingTimeMs)
+    public EventService(ConnectionManager connectionManager, long pingTimeMs)
     {
-        this.rpcManager = rpcManager;
+        this.connectionManager = connectionManager;
         this.pingTimeMs = pingTimeMs;
     }
 
     @ThriftMethod
     public RpcCuratorEvent getNextEvent(CuratorProjection projection) throws InterruptedException
     {
-        CuratorEntry entry = rpcManager.get(projection.id);
+        CuratorEntry entry = connectionManager.get(projection.id);
         if ( entry == null )
         {
             // TODO

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
index 708dc32..b76730d 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
@@ -34,9 +34,9 @@ import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.x.rpc.Closer;
-import org.apache.curator.x.rpc.CuratorEntry;
-import org.apache.curator.x.rpc.RpcManager;
+import org.apache.curator.x.rpc.connections.Closer;
+import org.apache.curator.x.rpc.connections.CuratorEntry;
+import org.apache.curator.x.rpc.connections.ConnectionManager;
 import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,11 +47,11 @@ import java.util.concurrent.TimeUnit;
 public class CuratorProjectionService
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final RpcManager rpcManager;
+    private final ConnectionManager connectionManager;
 
-    public CuratorProjectionService(RpcManager rpcManager)
+    public CuratorProjectionService(ConnectionManager connectionManager)
     {
-        this.rpcManager = rpcManager;
+        this.connectionManager = connectionManager;
     }
 
     @ThriftMethod
@@ -60,7 +60,7 @@ public class CuratorProjectionService
         CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new
RetryOneTime(1));
         String id = UUID.randomUUID().toString();
         client.start();
-        rpcManager.add(id, client);
+        connectionManager.add(id, client);
         final CuratorProjection projection = new CuratorProjection(id);
 
         ConnectionStateListener listener = new ConnectionStateListener()
@@ -79,7 +79,7 @@ public class CuratorProjectionService
     @ThriftMethod
     public void closeCuratorProjection(CuratorProjection projection)
     {
-        CuratorEntry entry = rpcManager.remove(projection.id);
+        CuratorEntry entry = connectionManager.remove(projection.id);
         if ( entry != null )
         {
             entry.close();
@@ -166,7 +166,7 @@ public class CuratorProjectionService
 
     private void addEvent(CuratorProjection projection, RpcCuratorEvent event)
     {
-        CuratorEntry entry = rpcManager.get(projection.id);
+        CuratorEntry entry = connectionManager.get(projection.id);
         if ( entry != null )
         {
             entry.addEvent(event);
@@ -202,7 +202,7 @@ public class CuratorProjectionService
 
     private CuratorEntry getEntry(CuratorProjection projection) throws Exception
     {
-        CuratorEntry entry = rpcManager.get(projection.id);
+        CuratorEntry entry = connectionManager.get(projection.id);
         if ( entry == null )
         {
             throw new Exception("No client found with id: " + projection.id);


Mime
View raw message