drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [5/5] drill git commit: DRILL-4297: Enable custom serializers and deserializers when using CustomTunnel
Date Fri, 05 Feb 2016 15:00:58 GMT
DRILL-4297: Enable custom serializers and deserializers when using CustomTunnel

- Adds support for customer serializers and deserializers
- Adds pre-built serializers and deserializers for Protobuf, Jackson, Protostuff (protobuf)
and Protostuff (json)


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e57c6542
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e57c6542
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e57c6542

Branch: refs/heads/master
Commit: e57c6542c20d2db37837c7a3e72700a8412ae822
Parents: 422c5a8
Author: Jacques Nadeau <jacques@apache.org>
Authored: Thu Feb 4 18:09:54 2016 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Fri Feb 5 07:00:32 2016 -0800

----------------------------------------------------------------------
 .../drill/exec/rpc/control/ControlTunnel.java   | 166 +++++++++++++++---
 .../drill/exec/rpc/control/Controller.java      |  42 ++++-
 .../drill/exec/rpc/control/ControllerImpl.java  |  17 +-
 .../exec/rpc/control/CustomHandlerRegistry.java |  50 +++---
 .../exec/rpc/control/TestCustomTunnel.java      | 167 ++++++++++++++++++-
 5 files changed, 393 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e57c6542/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index ff8be1d..ad6601c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -21,7 +21,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.drill.exec.proto.BitControl.CustomMessage;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -38,9 +37,24 @@ import org.apache.drill.exec.rpc.FutureBitCommand;
 import org.apache.drill.exec.rpc.ListeningCommand;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
-
+import org.apache.drill.exec.rpc.control.Controller.CustomSerDe;
+
+import com.dyuproject.protostuff.JsonIOUtil;
+import com.dyuproject.protostuff.LinkedBuffer;
+import com.dyuproject.protostuff.ProtobufIOUtil;
+import com.dyuproject.protostuff.ProtostuffIOUtil;
+import com.dyuproject.protostuff.Schema;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
+import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
 
@@ -188,11 +202,21 @@ public class ControlTunnel {
     }
   }
 
-  public <SEND extends Message, RECEIVE extends Message> CustomTunnel<SEND, RECEIVE>
getCustomTunnel(
+  @SuppressWarnings("unchecked")
+  public <SEND extends MessageLite, RECEIVE extends MessageLite> CustomTunnel<SEND,
RECEIVE> getCustomTunnel(
       int messageTypeId, Class<SEND> clazz, Parser<RECEIVE> parser) {
-    return new CustomTunnel<SEND, RECEIVE>(messageTypeId, parser);
+    return new CustomTunnel<SEND, RECEIVE>(
+        messageTypeId,
+        ((CustomSerDe<SEND>) new ProtoSerDe<Message>(null)),
+        new ProtoSerDe<RECEIVE>(parser));
+  }
+
+  public <SEND, RECEIVE> CustomTunnel<SEND, RECEIVE> getCustomTunnel(
+      int messageTypeId, CustomSerDe<SEND> send, CustomSerDe<RECEIVE> receive)
{
+    return new CustomTunnel<SEND, RECEIVE>(messageTypeId, send, receive);
   }
 
+
   private static class CustomMessageSender extends ListeningCommand<CustomMessage, ControlConnection>
{
 
     private CustomMessage message;
@@ -235,24 +259,24 @@ public class ControlTunnel {
    */
   public class CustomFuture<RECEIVE> {
 
-    private Parser<RECEIVE> parser;
-    private DrillRpcFuture<CustomMessage> future;
+    private final CustomSerDe<RECEIVE> serde;
+    private final DrillRpcFuture<CustomMessage> future;
 
-    public CustomFuture(Parser<RECEIVE> parser, DrillRpcFuture<CustomMessage>
future) {
+    public CustomFuture(CustomSerDe<RECEIVE> serde, DrillRpcFuture<CustomMessage>
future) {
       super();
-      this.parser = parser;
+      this.serde = serde;
       this.future = future;
     }
 
-    public RECEIVE get() throws RpcException, InvalidProtocolBufferException {
+    public RECEIVE get() throws Exception {
       CustomMessage message = future.checkedGet();
-      return parser.parseFrom(message.getMessage());
+      return serde.deserializeReceived(message.getMessage().toByteArray());
     }
 
-    public RECEIVE get(long timeout, TimeUnit unit) throws RpcException, TimeoutException,
+    public RECEIVE get(long timeout, TimeUnit unit) throws Exception,
         InvalidProtocolBufferException {
       CustomMessage message = future.checkedGet(timeout, unit);
-      return parser.parseFrom(message.getMessage());
+      return serde.deserializeReceived(message.getMessage().toByteArray());
     }
 
     public DrillBuf getBuffer() throws RpcException {
@@ -261,6 +285,7 @@ public class ControlTunnel {
 
   }
 
+
   /**
    * A special tunnel that can be used for custom types of messages. Its lifecycle is tied
to the underlying
    * ControlTunnel.
@@ -269,14 +294,16 @@ public class ControlTunnel {
    * @param <RECEIVE>
    *          The expected response the control tunnel expects to receive.
    */
-  public class CustomTunnel<SEND extends Message, RECEIVE extends Message> {
+  public class CustomTunnel<SEND, RECEIVE> {
     private int messageTypeId;
-    private Parser<RECEIVE> parser;
+    private CustomSerDe<SEND> send;
+    private CustomSerDe<RECEIVE> receive;
 
-    private CustomTunnel(int messageTypeId, Parser<RECEIVE> parser) {
+    private CustomTunnel(int messageTypeId, CustomSerDe<SEND> send, CustomSerDe<RECEIVE>
receive) {
       super();
       this.messageTypeId = messageTypeId;
-      this.parser = parser;
+      this.send = send;
+      this.receive = receive;
     }
 
     /**
@@ -289,13 +316,13 @@ public class ControlTunnel {
      */
     public CustomFuture<RECEIVE> send(SEND messageToSend, ByteBuf... dataBodies) {
       final CustomMessage customMessage = CustomMessage.newBuilder()
-          .setMessage(messageToSend.toByteString())
+          .setMessage(ByteString.copyFrom(send.serializeToSend(messageToSend)))
           .setType(messageTypeId)
           .build();
       final SyncCustomMessageSender b = new SyncCustomMessageSender(customMessage, dataBodies);
       manager.runCommand(b);
       DrillRpcFuture<CustomMessage> innerFuture = b.getFuture();
-      return new CustomFuture<RECEIVE>(parser, innerFuture);
+      return new CustomFuture<RECEIVE>(receive, innerFuture);
     }
 
     /**
@@ -309,7 +336,7 @@ public class ControlTunnel {
      */
     public void send(RpcOutcomeListener<RECEIVE> listener, SEND messageToSend, ByteBuf...
dataBodies) {
       final CustomMessage customMessage = CustomMessage.newBuilder()
-          .setMessage(messageToSend.toByteString())
+          .setMessage(ByteString.copyFrom(send.serializeToSend(messageToSend)))
           .setType(messageTypeId)
           .build();
       manager.runCommand(new CustomMessageSender(new CustomTunnelListener(listener), customMessage,
dataBodies));
@@ -331,9 +358,9 @@ public class ControlTunnel {
       @Override
       public void success(CustomMessage value, ByteBuf buffer) {
         try {
-          RECEIVE message = parser.parseFrom(value.getMessage());
+          RECEIVE message = receive.deserializeReceived(value.getMessage().toByteArray());
           innerListener.success(message, buffer);
-        } catch (InvalidProtocolBufferException e) {
+        } catch (Exception e) {
           innerListener.failed(new RpcException("Failure while parsing message locally.",
e));
         }
 
@@ -345,7 +372,104 @@ public class ControlTunnel {
       }
 
     }
+
+  }
+
+
+
+
+  public static class ProtoSerDe<MSG extends MessageLite> implements CustomSerDe<MSG>
{
+    private final Parser<MSG> parser;
+
+    ProtoSerDe(Parser<MSG> parser) {
+      this.parser = parser;
+    }
+
+    @Override
+    public byte[] serializeToSend(MSG send) {
+      return send.toByteArray();
+    }
+
+    @Override
+    public MSG deserializeReceived(byte[] bytes) throws Exception {
+      return parser.parseFrom(bytes);
+    }
+
+  }
+
+  public static class JacksonSerDe<MSG> implements CustomSerDe<MSG> {
+
+    private final ObjectWriter writer;
+    private final ObjectReader reader;
+
+    public JacksonSerDe(Class<MSG> clazz) {
+      ObjectMapper mapper = new ObjectMapper();
+      writer = mapper.writerFor(clazz);
+      reader = mapper.readerFor(clazz);
+    }
+
+    public JacksonSerDe(Class<MSG> clazz, JsonSerializer<MSG> serializer, JsonDeserializer<MSG>
deserializer) {
+      ObjectMapper mapper = new ObjectMapper();
+      SimpleModule module = new SimpleModule();
+      mapper.registerModule(module);
+      module.addSerializer(clazz, serializer);
+      module.addDeserializer(clazz, deserializer);
+      writer = mapper.writerFor(clazz);
+      reader = mapper.readerFor(clazz);
+    }
+
+    @Override
+    public byte[] serializeToSend(MSG send) {
+      try {
+        return writer.writeValueAsBytes(send);
+      } catch (JsonProcessingException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public MSG deserializeReceived(byte[] bytes) throws Exception {
+      return (MSG) reader.readValue(bytes);
+    }
+
   }
 
+  public static class ProtostuffBinarySerDe<MSG extends com.dyuproject.protostuff.Message<MSG>>
implements
+      CustomSerDe<MSG> {
+    private Schema<MSG> schema;
+
+    @Override
+    public byte[] serializeToSend(MSG send) {
+      final LinkedBuffer buffer = LinkedBuffer.allocate(512);
+      return ProtostuffIOUtil.toByteArray(send, schema, buffer);
+    }
 
+    @Override
+    public MSG deserializeReceived(byte[] bytes) throws Exception {
+      MSG msg = schema.newMessage();
+      ProtobufIOUtil.mergeFrom(bytes, msg, schema);
+      return msg;
+    }
+
+  }
+
+  public static class ProtostuffJsonSerDe<MSG extends com.dyuproject.protostuff.Message<MSG>>
implements
+      CustomSerDe<MSG> {
+    private Schema<MSG> schema;
+
+    @Override
+    public byte[] serializeToSend(MSG send) {
+      final LinkedBuffer buffer = LinkedBuffer.allocate(512);
+      return JsonIOUtil.toByteArray(send, schema, false, buffer);
+    }
+
+    @Override
+    public MSG deserializeReceived(byte[] bytes) throws Exception {
+      MSG msg = schema.newMessage();
+      JsonIOUtil.mergeFrom(bytes, msg, schema, false);
+      return msg;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e57c6542/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
index d6b288c..a5f470c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
@@ -49,7 +49,6 @@ public interface Controller extends AutoCloseable {
   /**
    * Register a new handler for custom message types. Should be done before any messages.
This is threadsafe as this
    * method manages locking internally.
-   *
    * @param messageTypeId
    *          The type of message id to handle. This corresponds to the CustomMessage.type
field. Note that only a
    *          single handler for a particular type of message can be registered within a
particular Drillbit.
@@ -62,13 +61,32 @@ public interface Controller extends AutoCloseable {
       CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser);
 
   /**
+   * Register a new handler for custom message types. Should be done before any messages.
This is threadsafe as this
+   * method manages locking internally.
+   * @param messageTypeId
+   *          The type of message id to handle. This corresponds to the CustomMessage.type
field. Note that only a
+   *          single handler for a particular type of message can be registered within a
particular Drillbit.
+   * @param handler
+   *          The handler that should be used to handle this type of message.
+   * @param requestSerde
+   *          CustomSerDe for incoming requests.
+   * @param responseSerde
+   *          CustomSerDe for serializing responses.
+   */
+  public <REQUEST, RESPONSE> void registerCustomHandler(int messageTypeId,
+      CustomMessageHandler<REQUEST, RESPONSE> handler,
+      CustomSerDe<REQUEST> requestSerde,
+      CustomSerDe<RESPONSE> responseSerde);
+
+  /**
    * Defines how the Controller should handle custom messages. Implementations need to be
threadsafe.
+   *
    * @param <REQUEST>
    *          The type of request message.
    * @param <RESPONSE>
    *          The type of the response message.
    */
-  public interface CustomMessageHandler<REQUEST extends MessageLite, RESPONSE extends
MessageLite> {
+  public interface CustomMessageHandler<REQUEST, RESPONSE> {
 
     /**
      * Handle an incoming message.
@@ -81,15 +99,18 @@ public interface Controller extends AutoCloseable {
      *           throw this exception if there is an RPC failure that should be communicated
to the sender.
      */
     public CustomResponse<RESPONSE> onMessage(REQUEST pBody, DrillBuf dBody) throws
UserRpcException;
+
   }
 
+
+
   /**
    * A simple interface that describes the nature of the response to the custom incoming
message.
    *
    * @param <RESPONSE>
    *          The type of message that the respopnse contains. Must be a protobuf message
type.
    */
-  public interface CustomResponse<RESPONSE extends MessageLite> {
+  public interface CustomResponse<RESPONSE> {
 
     /**
      * The structured portion of the response.
@@ -103,4 +124,19 @@ public interface Controller extends AutoCloseable {
      */
     public ByteBuf[] getBodies();
   }
+
+  /**
+   * Interface for defining how to serialize and deserialize custom message for consumer
who want to use something other
+   * than Protobuf messages.
+   *
+   * @param <SEND>
+   *          The class that is expected to be sent.
+   * @param <RECEIVE>
+   *          The class that is expected to received.
+   */
+  public interface CustomSerDe<MSG> {
+    public byte[] serializeToSend(MSG send);
+
+    public MSG deserializeReceived(byte[] bytes) throws Exception;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e57c6542/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index ddc7778..482f117 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 
 import com.google.common.collect.Lists;
+import com.google.protobuf.Message;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
@@ -71,12 +72,26 @@ public class ControllerImpl implements Controller {
   }
 
 
+  @SuppressWarnings("unchecked")
   @Override
   public <REQUEST extends MessageLite, RESPONSE extends MessageLite> void registerCustomHandler(int
messageTypeId,
       CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser)
{
-    handlerRegistry.registerCustomHandler(messageTypeId, handler, parser);
+    handlerRegistry.registerCustomHandler(
+        messageTypeId,
+        handler,
+        new ControlTunnel.ProtoSerDe<REQUEST>(parser),
+        (CustomSerDe<RESPONSE>) new ControlTunnel.ProtoSerDe<Message>(null));
   }
 
+  @Override
+  public <REQUEST, RESPONSE> void registerCustomHandler(int messageTypeId,
+      CustomMessageHandler<REQUEST, RESPONSE> handler,
+      CustomSerDe<REQUEST> requestSerde,
+      CustomSerDe<RESPONSE> responseSerde) {
+    handlerRegistry.registerCustomHandler(messageTypeId, handler, requestSerde, responseSerde);
+  }
+
+
   public void close() throws Exception {
     List<AutoCloseable> closeables = Lists.newArrayList();
     closeables.add(server);

http://git-wip-us.apache.org/repos/asf/drill/blob/e57c6542/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
index 7fc09fb..7a2bd04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
@@ -36,9 +36,6 @@ import org.apache.drill.exec.rpc.control.Controller.CustomResponse;
 import com.carrotsearch.hppc.IntObjectHashMap;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
 
 public class CustomHandlerRegistry {
   // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CustomHandlerRegistry.class);
@@ -46,7 +43,7 @@ public class CustomHandlerRegistry {
   private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
   private final AutoCloseableLock read = new AutoCloseableLock(readWriteLock.readLock());
   private final AutoCloseableLock write = new AutoCloseableLock(readWriteLock.writeLock());
-  private final IntObjectHashMap<ParsingHandler<?>> handlers = new IntObjectHashMap<>();
+  private final IntObjectHashMap<ParsingHandler<?, ?>> handlers = new IntObjectHashMap<>();
   private volatile DrillbitEndpoint endpoint;
 
   public CustomHandlerRegistry() {
@@ -56,13 +53,15 @@ public class CustomHandlerRegistry {
     this.endpoint = endpoint;
   }
 
-  public <SEND extends MessageLite> void registerCustomHandler(int messageTypeId,
-      CustomMessageHandler<SEND, ?> handler,
-      Parser<SEND> parser) {
+  public <REQUEST, RESPONSE> void registerCustomHandler(int messageTypeId,
+      CustomMessageHandler<REQUEST, RESPONSE> handler,
+      Controller.CustomSerDe<REQUEST> requestSerde,
+      Controller.CustomSerDe<RESPONSE> responseSerde) {
     Preconditions.checkNotNull(handler);
-    Preconditions.checkNotNull(parser);
+    Preconditions.checkNotNull(requestSerde);
+    Preconditions.checkNotNull(responseSerde);
     try (AutoCloseableLock lock = write.open()) {
-      ParsingHandler<?> parsingHandler = handlers.get(messageTypeId);
+      ParsingHandler<?, ?> parsingHandler = handlers.get(messageTypeId);
       if (parsingHandler != null) {
         throw new IllegalStateException(String.format(
             "Only one handler can be registered for a given custom message type. You tried
to register a handler for "
@@ -70,13 +69,13 @@ public class CustomHandlerRegistry {
             messageTypeId));
       }
 
-      parsingHandler = new ParsingHandler<SEND>(handler, parser);
+      parsingHandler = new ParsingHandler<REQUEST, RESPONSE>(handler, requestSerde,
responseSerde);
       handlers.put(messageTypeId, parsingHandler);
     }
   }
 
   public Response handle(CustomMessage message, DrillBuf dBody) throws RpcException {
-    final ParsingHandler<?> handler;
+    final ParsingHandler<?, ?> handler;
     try (AutoCloseableLock lock = read.open()) {
       handler = handlers.get(message.getType());
     }
@@ -89,8 +88,12 @@ public class CustomHandlerRegistry {
               message.getType())));
     }
     final CustomResponse<?> customResponse = handler.onMessage(message.getMessage(),
dBody);
+    @SuppressWarnings("unchecked")
     final CustomMessage responseMessage = CustomMessage.newBuilder()
-        .setMessage(customResponse.getMessage().toByteString())
+        .setMessage(
+            ByteString.copyFrom(((Controller.CustomSerDe<Object>) handler.getResponseSerDe())
+                .serializeToSend(customResponse
+                .getMessage())))
         .setType(message.getType())
         .build();
     // make sure we don't pass in a null array.
@@ -99,23 +102,32 @@ public class CustomHandlerRegistry {
 
   }
 
-  private class ParsingHandler<SEND extends MessageLite> {
-    private final CustomMessageHandler<SEND, ?> handler;
-    private final Parser<SEND> parser;
+  private class ParsingHandler<REQUEST, RESPONSE> {
+    private final CustomMessageHandler<REQUEST, ?> handler;
+    private final Controller.CustomSerDe<REQUEST> requestSerde;
+    private final Controller.CustomSerDe<RESPONSE> responseSerde;
 
-    public ParsingHandler(CustomMessageHandler<SEND, ?> handler, Parser<SEND>
parser) {
+    public ParsingHandler(
+        CustomMessageHandler<REQUEST, RESPONSE> handler,
+        Controller.CustomSerDe<REQUEST> requestSerde,
+        Controller.CustomSerDe<RESPONSE> responseSerde) {
       super();
       this.handler = handler;
-      this.parser = parser;
+      this.requestSerde = requestSerde;
+      this.responseSerde = responseSerde;
+    }
+
+    public Controller.CustomSerDe<RESPONSE> getResponseSerDe() {
+      return responseSerde;
     }
 
     public CustomResponse<?> onMessage(ByteString pBody, DrillBuf dBody) throws UserRpcException
{
 
       try {
-        final SEND message = parser.parseFrom(pBody);
+        final REQUEST message = requestSerde.deserializeReceived(pBody.toByteArray());
         return handler.onMessage(message, dBody);
 
-      } catch (InvalidProtocolBufferException e) {
+      } catch (Exception e) {
         throw new UserRpcException(endpoint, "Failure parsing message.", e);
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e57c6542/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
index 2008a48..9770a7e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
@@ -30,7 +30,6 @@ import java.util.Random;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.UserRpcException;
 import org.apache.drill.exec.rpc.control.ControlTunnel.CustomFuture;
 import org.apache.drill.exec.rpc.control.ControlTunnel.CustomTunnel;
@@ -39,8 +38,6 @@ import org.apache.drill.exec.rpc.control.Controller.CustomResponse;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.junit.Test;
 
-import com.google.protobuf.InvalidProtocolBufferException;
-
 public class TestCustomTunnel extends BaseTestQuery {
 
   private final QueryId expectedId = QueryId
@@ -61,7 +58,7 @@ public class TestCustomTunnel extends BaseTestQuery {
   }
 
   @Test
-  public void ensureRoundTrip() throws RpcException, InvalidProtocolBufferException {
+  public void ensureRoundTrip() throws Exception {
 
     final DrillbitContext context = getDrillbitContext();
     final TestCustomMessageHandler handler = new TestCustomMessageHandler(context.getEndpoint(),
false);
@@ -74,7 +71,7 @@ public class TestCustomTunnel extends BaseTestQuery {
   }
 
   @Test
-  public void ensureRoundTripBytes() throws RpcException, InvalidProtocolBufferException
{
+  public void ensureRoundTripBytes() throws Exception {
     final DrillbitContext context = getDrillbitContext();
     final TestCustomMessageHandler handler = new TestCustomMessageHandler(context.getEndpoint(),
true);
     context.getController().registerCustomHandler(1002, handler, DrillbitEndpoint.PARSER);
@@ -90,6 +87,8 @@ public class TestCustomTunnel extends BaseTestQuery {
     assertTrue(Arrays.equals(expected, actual));
   }
 
+
+
   private class TestCustomMessageHandler implements CustomMessageHandler<DrillbitEndpoint,
QueryId> {
     private DrillbitEndpoint expectedValue;
     private final boolean returnBytes;
@@ -135,4 +134,162 @@ public class TestCustomTunnel extends BaseTestQuery {
       };
     }
   }
+
+  @Test
+  public void ensureRoundTripJackson() throws Exception {
+    final DrillbitContext context = getDrillbitContext();
+    final MesgA mesgA = new MesgA();
+    mesgA.fieldA = "123";
+    mesgA.fieldB = "okra";
+
+    final TestCustomMessageHandlerJackson handler = new TestCustomMessageHandlerJackson(mesgA);
+    context.getController().registerCustomHandler(1003, handler,
+        new ControlTunnel.JacksonSerDe<MesgA>(MesgA.class),
+        new ControlTunnel.JacksonSerDe<MesgB>(MesgB.class));
+    final ControlTunnel loopbackTunnel = context.getController().getTunnel(context.getEndpoint());
+    final CustomTunnel<MesgA, MesgB> tunnel = loopbackTunnel.getCustomTunnel(
+        1003,
+        new ControlTunnel.JacksonSerDe<MesgA>(MesgA.class),
+        new ControlTunnel.JacksonSerDe<MesgB>(MesgB.class));
+    CustomFuture<MesgB> future = tunnel.send(mesgA);
+    assertEquals(expectedB, future.get());
+  }
+
+  private MesgB expectedB = new MesgB().set("hello", "bye", "friend");
+
+  public static class MesgA {
+    public String fieldA;
+    public String fieldB;
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((fieldA == null) ? 0 : fieldA.hashCode());
+      result = prime * result + ((fieldB == null) ? 0 : fieldB.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      MesgA other = (MesgA) obj;
+      if (fieldA == null) {
+        if (other.fieldA != null) {
+          return false;
+        }
+      } else if (!fieldA.equals(other.fieldA)) {
+        return false;
+      }
+      if (fieldB == null) {
+        if (other.fieldB != null) {
+          return false;
+        }
+      } else if (!fieldB.equals(other.fieldB)) {
+        return false;
+      }
+      return true;
+    }
+
+  }
+
+  public static class MesgB {
+    public String fieldA;
+    public String fieldB;
+    public String fieldC;
+
+    public MesgB set(String a, String b, String c) {
+      fieldA = a;
+      fieldB = b;
+      fieldC = c;
+      return this;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((fieldA == null) ? 0 : fieldA.hashCode());
+      result = prime * result + ((fieldB == null) ? 0 : fieldB.hashCode());
+      result = prime * result + ((fieldC == null) ? 0 : fieldC.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      MesgB other = (MesgB) obj;
+      if (fieldA == null) {
+        if (other.fieldA != null) {
+          return false;
+        }
+      } else if (!fieldA.equals(other.fieldA)) {
+        return false;
+      }
+      if (fieldB == null) {
+        if (other.fieldB != null) {
+          return false;
+        }
+      } else if (!fieldB.equals(other.fieldB)) {
+        return false;
+      }
+      if (fieldC == null) {
+        if (other.fieldC != null) {
+          return false;
+        }
+      } else if (!fieldC.equals(other.fieldC)) {
+        return false;
+      }
+      return true;
+    }
+
+  }
+
+  private class TestCustomMessageHandlerJackson implements CustomMessageHandler<MesgA,
MesgB> {
+    private MesgA expectedValue;
+
+    public TestCustomMessageHandlerJackson(MesgA expectedValue) {
+      super();
+      this.expectedValue = expectedValue;
+    }
+
+    @Override
+    public CustomResponse<MesgB> onMessage(MesgA pBody, DrillBuf dBody) throws UserRpcException
{
+
+      if (!expectedValue.equals(pBody)) {
+        throw new UserRpcException(DrillbitEndpoint.getDefaultInstance(),
+            "Invalid expected downstream value.", new IllegalStateException());
+      }
+
+      return new CustomResponse<MesgB>() {
+
+        @Override
+        public MesgB getMessage() {
+          return expectedB;
+        }
+
+        @Override
+        public ByteBuf[] getBodies() {
+          return null;
+        }
+
+      };
+    }
+  }
 }


Mime
View raw message