curator-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From randg...@apache.org
Subject [12/14] git commit: wip
Date Tue, 27 May 2014 13:57:34 GMT
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c8d49ae8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c8d49ae8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c8d49ae8

Branch: refs/heads/curator-rpc
Commit: c8d49ae877c0ea95302c1339563293f31c1fd5db
Parents: 7c99ddb
Author: randgalt <randgalt@apache.org>
Authored: Mon May 26 18:02:05 2014 -0500
Committer: randgalt <randgalt@apache.org>
Committed: Mon May 26 18:02:05 2014 -0500

----------------------------------------------------------------------
 curator-x-rpc/pom.xml                           |   6 +
 .../curator/x/rpc/CuratorProjectionServer.java  |   5 +-
 .../org/apache/curator/x/rpc/RpcManager.java    |   9 +
 .../curator/x/rpc/idl/event/EventService.java   |  36 +++-
 .../x/rpc/idl/event/RpcCuratorEvent.java        |  60 +++++-
 .../x/rpc/idl/event/RpcCuratorEventType.java    |   8 +-
 .../x/rpc/idl/projection/CreateSpec.java        |   4 +-
 .../projection/CuratorProjectionService.java    |  29 ++-
 curator-x-rpc/src/main/thrift/curator.thrift    |   6 +-
 .../apache/curator/generated/CreateSpec.java    |  37 ++--
 .../curator/generated/CuratorEventType.java     |  60 +++---
 .../apache/curator/generated/EventService.java  | 188 +++++++++++++++++--
 .../org/apache/curator/x/rpc/TestClient.java    |  29 ++-
 13 files changed, 410 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-rpc/pom.xml b/curator-x-rpc/pom.xml
index 42de982..cf2442c 100644
--- a/curator-x-rpc/pom.xml
+++ b/curator-x-rpc/pom.xml
@@ -20,5 +20,11 @@
             <groupId>com.facebook.swift</groupId>
             <artifactId>swift-service</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index 6fcc6d7..f5d5649 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -31,8 +31,9 @@ public class CuratorProjectionServer
 {
     public static void main(String[] args)
     {
-        EventService eventService = new EventService();
-        CuratorProjectionService projectionService = new CuratorProjectionService(eventService);
+        RpcManager rpcManager = new RpcManager();
+        EventService eventService = new EventService(rpcManager, 5000); // TODO
+        CuratorProjectionService projectionService = new CuratorProjectionService(rpcManager,
eventService);
         ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(),
Lists.<ThriftEventHandler>newArrayList(), projectionService, eventService);
         ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8899));
 // TODO
         server.start();

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
index 783297d..bdc5138 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
@@ -2,6 +2,7 @@ package org.apache.curator.x.rpc;
 
 import com.google.common.collect.Maps;
 import org.apache.curator.framework.CuratorFramework;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -56,4 +57,12 @@ public class RpcManager
         CuratorEntry entry = projections.remove(id);
         return (entry != null) ? entry.client : null;
     }
+
+    public void touch(List<String> ids)
+    {
+        for ( String id : ids )
+        {
+            getClient(id);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
index 206a347..3b7cde9 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
@@ -2,13 +2,28 @@ package org.apache.curator.x.rpc.idl.event;
 
 import com.facebook.swift.service.ThriftMethod;
 import com.facebook.swift.service.ThriftService;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
+import org.apache.curator.x.rpc.RpcManager;
+import org.apache.curator.x.rpc.idl.projection.CuratorProjection;
+import javax.annotation.Nullable;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 @ThriftService("EventService")
 public class EventService
 {
     private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue();
+    private final RpcManager rpcManager;
+    private final int pingTimeMs;
+
+    public EventService(RpcManager rpcManager, int pingTimeMs)
+    {
+        this.rpcManager = rpcManager;
+        this.pingTimeMs = pingTimeMs;
+    }
 
     public void addEvent(RpcCuratorEvent event)
     {
@@ -16,8 +31,25 @@ public class EventService
     }
 
     @ThriftMethod
-    public RpcCuratorEvent getNextEvent() throws InterruptedException
+    public RpcCuratorEvent getNextEvent(List<CuratorProjection> projections) throws
InterruptedException
     {
-        return events.take();
+        if ( projections != null )
+        {
+            List<String> ids = Lists.transform
+            (
+                projections,
+                new Function<CuratorProjection, String>()
+                {
+                    @Override
+                    public String apply(CuratorProjection projection)
+                    {
+                        return projection.id;
+                    }
+                }
+            );
+            rpcManager.touch(ids);
+        }
+        RpcCuratorEvent event = events.poll(pingTimeMs, TimeUnit.MILLISECONDS);
+        return (event != null) ? event : new RpcCuratorEvent();
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
index 6896e89..38a5ec3 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
@@ -24,6 +24,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.x.rpc.idl.projection.CuratorProjection;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -71,7 +72,17 @@ public class RpcCuratorEvent
 
     public RpcCuratorEvent()
     {
-        throw new UnsupportedOperationException();
+        this.projection = null;
+        this.type = RpcCuratorEventType.PING;
+        this.resultCode = 0;
+        this.path = null;
+        this.context = null;
+        this.stat = null;
+        this.data = null;
+        this.name = null;
+        this.children = null;
+        this.aclList = null;
+        this.watchedEvent = null;
     }
 
     public RpcCuratorEvent(CuratorProjection projection, CuratorEvent event)
@@ -89,6 +100,53 @@ public class RpcCuratorEvent
         this.watchedEvent = toRpcWatchedEvent(event.getWatchedEvent());
     }
 
+    public RpcCuratorEvent(CuratorProjection projection, ConnectionState newState)
+    {
+        this.projection = projection;
+        this.type = toRpcCuratorEventType(newState);
+        this.resultCode = 0;
+        this.path = null;
+        this.context = null;
+        this.stat = null;
+        this.data = null;
+        this.name = null;
+        this.children = null;
+        this.aclList = null;
+        this.watchedEvent = null;
+    }
+
+    private RpcCuratorEventType toRpcCuratorEventType(ConnectionState state)
+    {
+        switch ( state )
+        {
+            case CONNECTED:
+            {
+                return RpcCuratorEventType.CONNECTION_CONNECTED;
+            }
+
+            case SUSPENDED:
+            {
+                return RpcCuratorEventType.CONNECTION_SUSPENDED;
+            }
+
+            case RECONNECTED:
+            {
+                return RpcCuratorEventType.CONNECTION_RECONNECTED;
+            }
+
+            case LOST:
+            {
+                return RpcCuratorEventType.CONNECTION_LOST;
+            }
+
+            case READ_ONLY:
+            {
+                return RpcCuratorEventType.CONNECTION_READ_ONLY;
+            }
+        }
+        throw new IllegalStateException("Unknown state: " + state);
+    }
+
     private RpcCuratorEventType toRpcCuratorEventType(CuratorEventType eventType)
     {
         switch ( eventType )

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
index f08aa4a..f8d6468 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
@@ -23,6 +23,7 @@ import com.facebook.swift.codec.ThriftEnum;
 @ThriftEnum("CuratorEventType")
 public enum RpcCuratorEventType
 {
+    PING,
     CREATE,
     DELETE,
     EXISTS,
@@ -33,5 +34,10 @@ public enum RpcCuratorEventType
     GET_ACL,
     SET_ACL,
     WATCHED,
-    CLOSING
+    CLOSING,
+    CONNECTION_CONNECTED,
+    CONNECTION_SUSPENDED,
+    CONNECTION_RECONNECTED,
+    CONNECTION_LOST,
+    CONNECTION_READ_ONLY
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java
index 8e7acf7..d451b26 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CreateSpec.java
@@ -28,7 +28,7 @@ public class CreateSpec
     public String path;
 
     @ThriftField(2)
-    public String data;
+    public byte[] data;
 
     @ThriftField(3)
     public CreateMode mode;
@@ -49,7 +49,7 @@ public class CreateSpec
     {
     }
 
-    public CreateSpec(String path, String data, CreateMode mode, boolean doAsync, boolean
compressed, boolean creatingParentsIfNeeded, boolean withProtection)
+    public CreateSpec(String path, byte[] data, CreateMode mode, boolean doAsync, boolean
compressed, boolean creatingParentsIfNeeded, boolean withProtection)
     {
         this.path = path;
         this.data = data;

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
index 34eff59..bca32d9 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -29,6 +30,8 @@ import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.CreateModable;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.x.rpc.RpcManager;
 import org.apache.curator.x.rpc.idl.event.EventService;
@@ -38,11 +41,12 @@ import java.util.UUID;
 @ThriftService("CuratorService")
 public class CuratorProjectionService
 {
-    private final RpcManager rpcManager = new RpcManager();
+    private final RpcManager rpcManager;
     private final EventService eventService;
 
-    public CuratorProjectionService(EventService eventService)
+    public CuratorProjectionService(RpcManager rpcManager, EventService eventService)
     {
+        this.rpcManager = rpcManager;
         this.eventService = eventService;
     }
 
@@ -53,7 +57,19 @@ public class CuratorProjectionService
         String id = UUID.randomUUID().toString();
         client.start();
         rpcManager.add(id, client);
-        return new CuratorProjection(id);
+        final CuratorProjection projection = new CuratorProjection(id);
+
+        ConnectionStateListener listener = new ConnectionStateListener()
+        {
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                eventService.addEvent(new RpcCuratorEvent(projection, newState));
+            }
+        };
+        client.getConnectionStateListenable().addListener(listener);
+
+        return projection;
     }
 
     @ThriftMethod
@@ -84,7 +100,10 @@ public class CuratorProjectionService
         {
             builder = castBuilder(builder, CreateBuilder.class).withProtection();
         }
-        builder = castBuilder(builder, CreateModable.class).withMode(getRealMode(createSpec.mode));
+        if ( createSpec.mode != null )
+        {
+            builder = castBuilder(builder, CreateModable.class).withMode(getRealMode(createSpec.mode));
+        }
 
         if ( createSpec.doAsync )
         {
@@ -99,7 +118,7 @@ public class CuratorProjectionService
             builder = castBuilder(builder, Backgroundable.class).inBackground(backgroundCallback);
         }
 
-        return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(createSpec.path,
createSpec.data.getBytes()));
+        return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(createSpec.path,
createSpec.data));
     }
 
     private org.apache.zookeeper.CreateMode getRealMode(CreateMode mode)

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/main/thrift/curator.thrift
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift
index b9d5beb..100f456 100644
--- a/curator-x-rpc/src/main/thrift/curator.thrift
+++ b/curator-x-rpc/src/main/thrift/curator.thrift
@@ -8,7 +8,7 @@ enum CreateMode {
 }
 
 enum CuratorEventType {
-  CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN, SYNC, GET_ACL, SET_ACL, WATCHED,
CLOSING
+  PING, CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN, SYNC, GET_ACL, SET_ACL, WATCHED,
CLOSING, CONNECTION_CONNECTED, CONNECTION_SUSPENDED, CONNECTION_RECONNECTED, CONNECTION_LOST,
CONNECTION_READ_ONLY
 }
 
 enum EventType {
@@ -21,7 +21,7 @@ enum KeeperState {
 
 struct CreateSpec {
   1: string path;
-  2: string data;
+  2: binary data;
   3: CreateMode mode;
   4: bool doAsync;
   5: bool compressed;
@@ -87,5 +87,5 @@ service CuratorService {
 }
 
 service EventService {
-  CuratorEvent getNextEvent();
+  CuratorEvent getNextEvent(1: list<CuratorProjection> projections);
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java
index 67437d3..4488285 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/CreateSpec.java
@@ -50,7 +50,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
   }
 
   public String path; // required
-  public String data; // required
+  public ByteBuffer data; // required
   /**
    * 
    * @see CreateMode
@@ -153,7 +153,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
     tmpMap.put(_Fields.PATH, new org.apache.thrift.meta_data.FieldMetaData("path", org.apache.thrift.TFieldRequirementType.DEFAULT,

         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT,

-        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING
       , true)));
     tmpMap.put(_Fields.MODE, new org.apache.thrift.meta_data.FieldMetaData("mode", org.apache.thrift.TFieldRequirementType.DEFAULT,

         new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM,
CreateMode.class)));
     tmpMap.put(_Fields.DO_ASYNC, new org.apache.thrift.meta_data.FieldMetaData("doAsync",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
@@ -173,7 +173,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
 
   public CreateSpec(
     String path,
-    String data,
+    ByteBuffer data,
     CreateMode mode,
     boolean doAsync,
     boolean compressed,
@@ -203,7 +203,8 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
       this.path = other.path;
     }
     if (other.isSetData()) {
-      this.data = other.data;
+      this.data = org.apache.thrift.TBaseHelper.copyBinary(other.data);
+;
     }
     if (other.isSetMode()) {
       this.mode = other.mode;
@@ -257,11 +258,21 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
     }
   }
 
-  public String getData() {
-    return this.data;
+  public byte[] getData() {
+    setData(org.apache.thrift.TBaseHelper.rightSize(data));
+    return data == null ? null : data.array();
+  }
+
+  public ByteBuffer bufferForData() {
+    return data;
+  }
+
+  public CreateSpec setData(byte[] data) {
+    setData(data == null ? (ByteBuffer)null : ByteBuffer.wrap(data));
+    return this;
   }
 
-  public CreateSpec setData(String data) {
+  public CreateSpec setData(ByteBuffer data) {
     this.data = data;
     return this;
   }
@@ -419,7 +430,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
       if (value == null) {
         unsetData();
       } else {
-        setData((String)value);
+        setData((ByteBuffer)value);
       }
       break;
 
@@ -712,7 +723,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
     if (this.data == null) {
       sb.append("null");
     } else {
-      sb.append(this.data);
+      org.apache.thrift.TBaseHelper.toString(this.data, sb);
     }
     first = false;
     if (!first) sb.append(", ");
@@ -794,7 +805,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
             break;
           case 2: // DATA
             if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
-              struct.data = iprot.readString();
+              struct.data = iprot.readBinary();
               struct.setDataIsSet(true);
             } else { 
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
@@ -862,7 +873,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
       }
       if (struct.data != null) {
         oprot.writeFieldBegin(DATA_FIELD_DESC);
-        oprot.writeString(struct.data);
+        oprot.writeBinary(struct.data);
         oprot.writeFieldEnd();
       }
       if (struct.mode != null) {
@@ -926,7 +937,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
         oprot.writeString(struct.path);
       }
       if (struct.isSetData()) {
-        oprot.writeString(struct.data);
+        oprot.writeBinary(struct.data);
       }
       if (struct.isSetMode()) {
         oprot.writeI32(struct.mode.getValue());
@@ -954,7 +965,7 @@ public class CreateSpec implements org.apache.thrift.TBase<CreateSpec,
CreateSpe
         struct.setPathIsSet(true);
       }
       if (incoming.get(1)) {
-        struct.data = iprot.readString();
+        struct.data = iprot.readBinary();
         struct.setDataIsSet(true);
       }
       if (incoming.get(2)) {

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
index 60350fc..ce31158 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
@@ -12,17 +12,23 @@ import java.util.HashMap;
 import org.apache.thrift.TEnum;
 
 public enum CuratorEventType implements org.apache.thrift.TEnum {
-  CREATE(0),
-  DELETE(1),
-  EXISTS(2),
-  GET_DATA(3),
-  SET_DATA(4),
-  CHILDREN(5),
-  SYNC(6),
-  GET_ACL(7),
-  SET_ACL(8),
-  WATCHED(9),
-  CLOSING(10);
+  PING(0),
+  CREATE(1),
+  DELETE(2),
+  EXISTS(3),
+  GET_DATA(4),
+  SET_DATA(5),
+  CHILDREN(6),
+  SYNC(7),
+  GET_ACL(8),
+  SET_ACL(9),
+  WATCHED(10),
+  CLOSING(11),
+  CONNECTION_CONNECTED(12),
+  CONNECTION_SUSPENDED(13),
+  CONNECTION_RECONNECTED(14),
+  CONNECTION_LOST(15),
+  CONNECTION_READ_ONLY(16);
 
   private final int value;
 
@@ -44,27 +50,39 @@ public enum CuratorEventType implements org.apache.thrift.TEnum {
   public static CuratorEventType findByValue(int value) { 
     switch (value) {
       case 0:
-        return CREATE;
+        return PING;
       case 1:
-        return DELETE;
+        return CREATE;
       case 2:
-        return EXISTS;
+        return DELETE;
       case 3:
-        return GET_DATA;
+        return EXISTS;
       case 4:
-        return SET_DATA;
+        return GET_DATA;
       case 5:
-        return CHILDREN;
+        return SET_DATA;
       case 6:
-        return SYNC;
+        return CHILDREN;
       case 7:
-        return GET_ACL;
+        return SYNC;
       case 8:
-        return SET_ACL;
+        return GET_ACL;
       case 9:
-        return WATCHED;
+        return SET_ACL;
       case 10:
+        return WATCHED;
+      case 11:
         return CLOSING;
+      case 12:
+        return CONNECTION_CONNECTED;
+      case 13:
+        return CONNECTION_SUSPENDED;
+      case 14:
+        return CONNECTION_RECONNECTED;
+      case 15:
+        return CONNECTION_LOST;
+      case 16:
+        return CONNECTION_READ_ONLY;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java
index 7c8e294..7b41888 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/EventService.java
@@ -36,13 +36,13 @@ public class EventService {
 
   public interface Iface {
 
-    public CuratorEvent getNextEvent() throws org.apache.thrift.TException;
+    public CuratorEvent getNextEvent(List<CuratorProjection> projections) throws org.apache.thrift.TException;
 
   }
 
   public interface AsyncIface {
 
-    public void getNextEvent(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws
org.apache.thrift.TException;
+    public void getNextEvent(List<CuratorProjection> projections, org.apache.thrift.async.AsyncMethodCallback
resultHandler) throws org.apache.thrift.TException;
 
   }
 
@@ -66,15 +66,16 @@ public class EventService {
       super(iprot, oprot);
     }
 
-    public CuratorEvent getNextEvent() throws org.apache.thrift.TException
+    public CuratorEvent getNextEvent(List<CuratorProjection> projections) throws org.apache.thrift.TException
     {
-      send_getNextEvent();
+      send_getNextEvent(projections);
       return recv_getNextEvent();
     }
 
-    public void send_getNextEvent() throws org.apache.thrift.TException
+    public void send_getNextEvent(List<CuratorProjection> projections) throws org.apache.thrift.TException
     {
       getNextEvent_args args = new getNextEvent_args();
+      args.setProjections(projections);
       sendBase("getNextEvent", args);
     }
 
@@ -106,21 +107,24 @@ public class EventService {
       super(protocolFactory, clientManager, transport);
     }
 
-    public void getNextEvent(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws
org.apache.thrift.TException {
+    public void getNextEvent(List<CuratorProjection> projections, org.apache.thrift.async.AsyncMethodCallback
resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      getNextEvent_call method_call = new getNextEvent_call(resultHandler, this, ___protocolFactory,
___transport);
+      getNextEvent_call method_call = new getNextEvent_call(projections, resultHandler, this,
___protocolFactory, ___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
 
     public static class getNextEvent_call extends org.apache.thrift.async.TAsyncMethodCall
{
-      public getNextEvent_call(org.apache.thrift.async.AsyncMethodCallback resultHandler,
org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory,
org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException
{
+      private List<CuratorProjection> projections;
+      public getNextEvent_call(List<CuratorProjection> projections, org.apache.thrift.async.AsyncMethodCallback
resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory
protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException
{
         super(client, protocolFactory, transport, resultHandler, false);
+        this.projections = projections;
       }
 
       public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException
{
         prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getNextEvent", org.apache.thrift.protocol.TMessageType.CALL,
0));
         getNextEvent_args args = new getNextEvent_args();
+        args.setProjections(projections);
         args.write(prot);
         prot.writeMessageEnd();
       }
@@ -167,7 +171,7 @@ public class EventService {
 
       public getNextEvent_result getResult(I iface, getNextEvent_args args) throws org.apache.thrift.TException
{
         getNextEvent_result result = new getNextEvent_result();
-        result.success = iface.getNextEvent();
+        result.success = iface.getNextEvent(args.projections);
         return result;
       }
     }
@@ -236,7 +240,7 @@ public class EventService {
       }
 
       public void start(I iface, getNextEvent_args args, org.apache.thrift.async.AsyncMethodCallback<CuratorEvent>
resultHandler) throws TException {
-        iface.getNextEvent(resultHandler);
+        iface.getNextEvent(args.projections,resultHandler);
       }
     }
 
@@ -245,6 +249,7 @@ public class EventService {
   public static class getNextEvent_args implements org.apache.thrift.TBase<getNextEvent_args,
getNextEvent_args._Fields>, java.io.Serializable, Cloneable, Comparable<getNextEvent_args>
  {
     private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getNextEvent_args");
 
+    private static final org.apache.thrift.protocol.TField PROJECTIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("projections",
org.apache.thrift.protocol.TType.LIST, (short)1);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
@@ -252,10 +257,11 @@ public class EventService {
       schemes.put(TupleScheme.class, new getNextEvent_argsTupleSchemeFactory());
     }
 
+    public List<CuratorProjection> projections; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-;
+      PROJECTIONS((short)1, "projections");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -270,6 +276,8 @@ public class EventService {
        */
       public static _Fields findByThriftId(int fieldId) {
         switch(fieldId) {
+          case 1: // PROJECTIONS
+            return PROJECTIONS;
           default:
             return null;
         }
@@ -308,9 +316,14 @@ public class EventService {
         return _fieldName;
       }
     }
+
+    // isset id assignments
     public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
       Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields,
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.PROJECTIONS, new org.apache.thrift.meta_data.FieldMetaData("projections",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,

+              new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
CuratorProjection.class))));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getNextEvent_args.class,
metaDataMap);
     }
@@ -318,10 +331,24 @@ public class EventService {
     public getNextEvent_args() {
     }
 
+    public getNextEvent_args(
+      List<CuratorProjection> projections)
+    {
+      this();
+      this.projections = projections;
+    }
+
     /**
      * Performs a deep copy on <i>other</i>.
      */
     public getNextEvent_args(getNextEvent_args other) {
+      if (other.isSetProjections()) {
+        List<CuratorProjection> __this__projections = new ArrayList<CuratorProjection>(other.projections.size());
+        for (CuratorProjection other_element : other.projections) {
+          __this__projections.add(new CuratorProjection(other_element));
+        }
+        this.projections = __this__projections;
+      }
     }
 
     public getNextEvent_args deepCopy() {
@@ -330,15 +357,66 @@ public class EventService {
 
     @Override
     public void clear() {
+      this.projections = null;
+    }
+
+    public int getProjectionsSize() {
+      return (this.projections == null) ? 0 : this.projections.size();
+    }
+
+    public java.util.Iterator<CuratorProjection> getProjectionsIterator() {
+      return (this.projections == null) ? null : this.projections.iterator();
+    }
+
+    public void addToProjections(CuratorProjection elem) {
+      if (this.projections == null) {
+        this.projections = new ArrayList<CuratorProjection>();
+      }
+      this.projections.add(elem);
+    }
+
+    public List<CuratorProjection> getProjections() {
+      return this.projections;
+    }
+
+    public getNextEvent_args setProjections(List<CuratorProjection> projections) {
+      this.projections = projections;
+      return this;
+    }
+
+    public void unsetProjections() {
+      this.projections = null;
+    }
+
+    /** Returns true if field projections is set (has been assigned a value) and false otherwise
*/
+    public boolean isSetProjections() {
+      return this.projections != null;
+    }
+
+    public void setProjectionsIsSet(boolean value) {
+      if (!value) {
+        this.projections = null;
+      }
     }
 
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
+      case PROJECTIONS:
+        if (value == null) {
+          unsetProjections();
+        } else {
+          setProjections((List<CuratorProjection>)value);
+        }
+        break;
+
       }
     }
 
     public Object getFieldValue(_Fields field) {
       switch (field) {
+      case PROJECTIONS:
+        return getProjections();
+
       }
       throw new IllegalStateException();
     }
@@ -350,6 +428,8 @@ public class EventService {
       }
 
       switch (field) {
+      case PROJECTIONS:
+        return isSetProjections();
       }
       throw new IllegalStateException();
     }
@@ -367,6 +447,15 @@ public class EventService {
       if (that == null)
         return false;
 
+      boolean this_present_projections = true && this.isSetProjections();
+      boolean that_present_projections = true && that.isSetProjections();
+      if (this_present_projections || that_present_projections) {
+        if (!(this_present_projections && that_present_projections))
+          return false;
+        if (!this.projections.equals(that.projections))
+          return false;
+      }
+
       return true;
     }
 
@@ -383,6 +472,16 @@ public class EventService {
 
       int lastComparison = 0;
 
+      lastComparison = Boolean.valueOf(isSetProjections()).compareTo(other.isSetProjections());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetProjections()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.projections, other.projections);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -403,6 +502,13 @@ public class EventService {
       StringBuilder sb = new StringBuilder("getNextEvent_args(");
       boolean first = true;
 
+      sb.append("projections:");
+      if (this.projections == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.projections);
+      }
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -446,6 +552,25 @@ public class EventService {
             break;
           }
           switch (schemeField.id) {
+            case 1: // PROJECTIONS
+              if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+                {
+                  org.apache.thrift.protocol.TList _list16 = iprot.readListBegin();
+                  struct.projections = new ArrayList<CuratorProjection>(_list16.size);
+                  for (int _i17 = 0; _i17 < _list16.size; ++_i17)
+                  {
+                    CuratorProjection _elem18;
+                    _elem18 = new CuratorProjection();
+                    _elem18.read(iprot);
+                    struct.projections.add(_elem18);
+                  }
+                  iprot.readListEnd();
+                }
+                struct.setProjectionsIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
           }
@@ -461,6 +586,18 @@ public class EventService {
         struct.validate();
 
         oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.projections != null) {
+          oprot.writeFieldBegin(PROJECTIONS_FIELD_DESC);
+          {
+            oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT,
struct.projections.size()));
+            for (CuratorProjection _iter19 : struct.projections)
+            {
+              _iter19.write(oprot);
+            }
+            oprot.writeListEnd();
+          }
+          oprot.writeFieldEnd();
+        }
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -478,11 +615,40 @@ public class EventService {
       @Override
       public void write(org.apache.thrift.protocol.TProtocol prot, getNextEvent_args struct)
throws org.apache.thrift.TException {
         TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetProjections()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetProjections()) {
+          {
+            oprot.writeI32(struct.projections.size());
+            for (CuratorProjection _iter20 : struct.projections)
+            {
+              _iter20.write(oprot);
+            }
+          }
+        }
       }
 
       @Override
       public void read(org.apache.thrift.protocol.TProtocol prot, getNextEvent_args struct)
throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          {
+            org.apache.thrift.protocol.TList _list21 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT,
iprot.readI32());
+            struct.projections = new ArrayList<CuratorProjection>(_list21.size);
+            for (int _i22 = 0; _i22 < _list21.size; ++_i22)
+            {
+              CuratorProjection _elem23;
+              _elem23 = new CuratorProjection();
+              _elem23.read(iprot);
+              struct.projections.add(_elem23);
+            }
+          }
+          struct.setProjectionsIsSet(true);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/c8d49ae8/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
index e479031..79cea30 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
@@ -18,21 +18,26 @@
  */
 package org.apache.curator.x.rpc;
 
+import org.apache.curator.generated.CreateSpec;
 import org.apache.curator.generated.CuratorEvent;
 import org.apache.curator.generated.CuratorProjection;
 import org.apache.curator.generated.CuratorProjectionSpec;
 import org.apache.curator.generated.CuratorService;
 import org.apache.curator.generated.EventService;
+import org.apache.curator.test.TestingServer;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TSocket;
+import java.util.Arrays;
 import java.util.concurrent.Executors;
 
 public class TestClient
 {
-    public static void main(String[] args) throws TException
+    public static void main(String[] args) throws Exception
     {
+        new TestingServer(2181);
+
         TSocket clientTransport = new TSocket("localhost", 8899);
         clientTransport.open();
         TProtocol clientProtocol = new TBinaryProtocol(clientTransport);
@@ -43,24 +48,36 @@ public class TestClient
         TProtocol eventProtocol = new TBinaryProtocol(eventTransport);
         final EventService.Client serviceClient = new EventService.Client(eventProtocol);
 
+        final CuratorProjection curatorProjection = client.newCuratorProjection(new CuratorProjectionSpec());
+
         Executors.newSingleThreadExecutor().submit
-        (new Runnable()
+        (
+            new Runnable()
             {
                 @Override
                 public void run()
                 {
                     try
                     {
-                        CuratorEvent nextEvent = serviceClient.getNextEvent();
-                        System.out.println(nextEvent.type);
+                        //noinspection InfiniteLoopStatement
+                        for(;;)
+                        {
+                            CuratorEvent nextEvent = serviceClient.getNextEvent(Arrays.asList(curatorProjection));
+                            System.out.println(nextEvent.type);
+                        }
                     }
                     catch ( TException e )
                     {
                         e.printStackTrace();
                     }
                 }
-            });
+            }
+        );
 
-        CuratorProjection curatorProjection = client.newCuratorProjection(new CuratorProjectionSpec());
+        CreateSpec createSpec = new CreateSpec();
+        createSpec.path = "/a/b/c";
+        createSpec.creatingParentsIfNeeded = true;
+        String path = client.create(curatorProjection, createSpec);
+        System.out.println(path);
     }
 }


Mime
View raw message