Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 66D5518EB1 for ; Mon, 11 Jan 2016 06:44:49 +0000 (UTC) Received: (qmail 90356 invoked by uid 500); 11 Jan 2016 06:44:49 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 90319 invoked by uid 500); 11 Jan 2016 06:44:49 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 90258 invoked by uid 99); 11 Jan 2016 06:44:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Jan 2016 06:44:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20AEBE00AC; Mon, 11 Jan 2016 06:44:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jacques@apache.org To: commits@drill.apache.org Date: Mon, 11 Jan 2016 06:44:50 -0000 Message-Id: <504ab1ec64cf43c79d26b85ee5d0b072@git.apache.org> In-Reply-To: <9555844d70f64685b7d5c1d2a29c0e03@git.apache.org> References: <9555844d70f64685b7d5c1d2a29c0e03@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] drill git commit: DRILL-4238: Add a custom RPC interface on the Control channel for extensible communication between bits. DRILL-4238: Add a custom RPC interface on the Control channel for extensible communication between bits. This closes #313. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/67d5cc6d Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/67d5cc6d Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/67d5cc6d Branch: refs/heads/master Commit: 67d5cc6df0ca394dd26ff5c0a2b0510779bad949 Parents: 467c405 Author: Jacques Nadeau Authored: Sat Jan 2 14:55:21 2016 -0800 Committer: Jacques Nadeau Committed: Sun Jan 10 22:41:50 2016 -0800 ---------------------------------------------------------------------- .../exec/rpc/control/ControlRpcConfig.java | 2 + .../drill/exec/rpc/control/ControlTunnel.java | 174 +++++- .../drill/exec/rpc/control/Controller.java | 63 ++ .../drill/exec/rpc/control/ControllerImpl.java | 13 + .../exec/rpc/control/CustomHandlerRegistry.java | 124 ++++ .../rpc/control/DefaultInstanceHandler.java | 4 +- .../exec/work/batch/ControlMessageHandler.java | 14 + .../exec/rpc/control/TestCustomTunnel.java | 138 +++++ .../org/apache/drill/exec/proto/BitControl.java | 599 +++++++++++++++++-- .../drill/exec/proto/SchemaBitControl.java | 119 ++++ .../drill/exec/proto/beans/CustomMessage.java | 186 ++++++ protocol/src/main/protobuf/BitControl.proto | 7 + 12 files changed, 1397 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java index 44046c9..ec09a98 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java @@ -22,6 +22,7 @@ import java.util.concurrent.Executor; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.proto.BitControl.BitControlHandshake; +import org.apache.drill.exec.proto.BitControl.CustomMessage; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.InitializeFragments; @@ -50,6 +51,7 @@ public class ControlRpcConfig { .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class) .add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class) + .add(RpcType.REQ_CUSTOM, CustomMessage.class, RpcType.RESP_CUSTOM, CustomMessage.class) .build(); } http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java index b90912d..ff8be1d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java @@ -17,6 +17,13 @@ */ package org.apache.drill.exec.rpc.control; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.drill.exec.proto.BitControl.CustomMessage; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.InitializeFragments; @@ -26,12 +33,16 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryProfile; -import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.DrillRpcFuture; import org.apache.drill.exec.rpc.FutureBitCommand; import org.apache.drill.exec.rpc.ListeningCommand; +import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; + public class ControlTunnel { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlTunnel.class); @@ -176,4 +187,165 @@ public class ControlTunnel { connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class); } } + + public CustomTunnel getCustomTunnel( + int messageTypeId, Class clazz, Parser parser) { + return new CustomTunnel(messageTypeId, parser); + } + + private static class CustomMessageSender extends ListeningCommand { + + private CustomMessage message; + private ByteBuf[] dataBodies; + + public CustomMessageSender(RpcOutcomeListener listener, CustomMessage message, ByteBuf[] dataBodies) { + super(listener); + this.message = message; + this.dataBodies = dataBodies; + } + + @Override + public void doRpcCall(RpcOutcomeListener outcomeListener, ControlConnection connection) { + connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies); + } + + } + + private static class SyncCustomMessageSender extends FutureBitCommand { + + private CustomMessage message; + private ByteBuf[] dataBodies; + + public SyncCustomMessageSender(CustomMessage message, ByteBuf[] dataBodies) { + super(); + this.message = message; + this.dataBodies = dataBodies; + } + + @Override + public void doRpcCall(RpcOutcomeListener outcomeListener, ControlConnection connection) { + connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies); + } + } + + /** + * A class used to return a synchronous future when doing custom rpc messages. + * @param + * The type of message that will be returned. + */ + public class CustomFuture { + + private Parser parser; + private DrillRpcFuture future; + + public CustomFuture(Parser parser, DrillRpcFuture future) { + super(); + this.parser = parser; + this.future = future; + } + + public RECEIVE get() throws RpcException, InvalidProtocolBufferException { + CustomMessage message = future.checkedGet(); + return parser.parseFrom(message.getMessage()); + } + + public RECEIVE get(long timeout, TimeUnit unit) throws RpcException, TimeoutException, + InvalidProtocolBufferException { + CustomMessage message = future.checkedGet(timeout, unit); + return parser.parseFrom(message.getMessage()); + } + + public DrillBuf getBuffer() throws RpcException { + return (DrillBuf) future.getBuffer(); + } + + } + + /** + * A special tunnel that can be used for custom types of messages. Its lifecycle is tied to the underlying + * ControlTunnel. + * @param + * The type of message the control tunnel will be able to send. + * @param + * The expected response the control tunnel expects to receive. + */ + public class CustomTunnel { + private int messageTypeId; + private Parser parser; + + private CustomTunnel(int messageTypeId, Parser parser) { + super(); + this.messageTypeId = messageTypeId; + this.parser = parser; + } + + /** + * Send a message and receive a future for monitoring the outcome. + * @param messageToSend + * The structured message to send. + * @param dataBodies + * One or more optional unstructured messages to append to the structure message. + * @return The CustomFuture that can be used to wait for the response. + */ + public CustomFuture send(SEND messageToSend, ByteBuf... dataBodies) { + final CustomMessage customMessage = CustomMessage.newBuilder() + .setMessage(messageToSend.toByteString()) + .setType(messageTypeId) + .build(); + final SyncCustomMessageSender b = new SyncCustomMessageSender(customMessage, dataBodies); + manager.runCommand(b); + DrillRpcFuture innerFuture = b.getFuture(); + return new CustomFuture(parser, innerFuture); + } + + /** + * Send a message using a custom listener. + * @param listener + * The listener to inform of the outcome of the sent message. + * @param messageToSend + * The structured message to send. + * @param dataBodies + * One or more optional unstructured messages to append to the structure message. + */ + public void send(RpcOutcomeListener listener, SEND messageToSend, ByteBuf... dataBodies) { + final CustomMessage customMessage = CustomMessage.newBuilder() + .setMessage(messageToSend.toByteString()) + .setType(messageTypeId) + .build(); + manager.runCommand(new CustomMessageSender(new CustomTunnelListener(listener), customMessage, dataBodies)); + } + + private class CustomTunnelListener implements RpcOutcomeListener { + final RpcOutcomeListener innerListener; + + public CustomTunnelListener(RpcOutcomeListener innerListener) { + super(); + this.innerListener = innerListener; + } + + @Override + public void failed(RpcException ex) { + innerListener.failed(ex); + } + + @Override + public void success(CustomMessage value, ByteBuf buffer) { + try { + RECEIVE message = parser.parseFrom(value.getMessage()); + innerListener.success(message, buffer); + } catch (InvalidProtocolBufferException e) { + innerListener.failed(new RpcException("Failure while parsing message locally.", e)); + } + + } + + @Override + public void interrupted(InterruptedException e) { + innerListener.interrupted(e); + } + + } + } + + } http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java index af60ff0..94df739 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java @@ -17,10 +17,17 @@ */ package org.apache.drill.exec.rpc.control; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + import java.io.Closeable; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.rpc.UserRpcException; + +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; /** * Service that allows one Drillbit to communicate with another. Internally manages whether each particular bit is a @@ -41,5 +48,61 @@ public interface Controller extends Closeable { public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException; + /** + * Register a new handler for custom message types. Should be done before any messages. This is threadsafe as this + * method manages locking internally. + * + * @param messageTypeId + * The type of message id to handle. This corresponds to the CustomMessage.type field. Note that only a + * single handler for a particular type of message can be registered within a particular Drillbit. + * @param handler + * The handler that should be used to handle this type of message. + * @param parser + * The parser used to handle the types of messages the handler above handles. + */ + public void registerCustomHandler(int messageTypeId, + CustomMessageHandler handler, Parser parser); + + /** + * Defines how the Controller should handle custom messages. Implementations need to be threadsafe. + * @param + * The type of request message. + * @param + * The type of the response message. + */ + public interface CustomMessageHandler { + + /** + * Handle an incoming message. + * @param pBody + * The protobuf body message object of type REQUEST that was sent. + * @param dBody + * An optional byte body that was sent along with the structured message. + * @return The response that should be sent to the message sender. + * @throws UserRpcException + * throw this exception if there is an RPC failure that should be communicated to the sender. + */ + public CustomResponse onMessage(REQUEST pBody, DrillBuf dBody) throws UserRpcException; + } + + /** + * A simple interface that describes the nature of the response to the custom incoming message. + * + * @param + * The type of message that the respopnse contains. Must be a protobuf message type. + */ + public interface CustomResponse { + + /** + * The structured portion of the response. + * @return A protobuf message of type RESPONSE + */ + public RESPONSE getMessage(); + /** + * The optional unstructured portion of the message. + * @return null or one or more unstructured bodies. + */ + public ByteBuf[] getBodies(); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java index 631d479..0564cca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java @@ -24,6 +24,8 @@ import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.work.batch.ControlMessageHandler; import com.google.common.io.Closeables; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; /** * Manages communication tunnels between nodes. @@ -36,6 +38,7 @@ public class ControllerImpl implements Controller { private final BootStrapContext context; private final ConnectionManagerRegistry connectionRegistry; private final boolean allowPortHunting; + private final CustomHandlerRegistry handlerRegistry; public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, boolean allowPortHunting) { super(); @@ -43,6 +46,7 @@ public class ControllerImpl implements Controller { this.context = context; this.connectionRegistry = new ConnectionManagerRegistry(handler, context); this.allowPortHunting = allowPortHunting; + this.handlerRegistry = handler.getHandlerRegistry(); } @Override @@ -52,6 +56,7 @@ public class ControllerImpl implements Controller { port = server.bind(port, allowPortHunting); DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build(); connectionRegistry.setEndpoint(completeEndpoint); + handlerRegistry.setEndpoint(completeEndpoint); return completeEndpoint; } @@ -60,6 +65,13 @@ public class ControllerImpl implements Controller { return new ControlTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint)); } + + @Override + public void registerCustomHandler(int messageTypeId, + CustomMessageHandler handler, Parser parser) { + handlerRegistry.registerCustomHandler(messageTypeId, handler, parser); + } + public void close() { Closeables.closeQuietly(server); for (ControlConnectionManager bt : connectionRegistry) { @@ -67,4 +79,5 @@ public class ControllerImpl implements Controller { } } + } http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java new file mode 100644 index 0000000..c328cd8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.drill.common.concurrent.AutoCloseableLock; +import org.apache.drill.exec.proto.BitControl.CustomMessage; +import org.apache.drill.exec.proto.BitControl.RpcType; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.rpc.Response; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserRpcException; +import org.apache.drill.exec.rpc.control.Controller.CustomMessageHandler; +import org.apache.drill.exec.rpc.control.Controller.CustomResponse; + +import com.carrotsearch.hppc.IntObjectOpenHashMap; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +public class CustomHandlerRegistry { + // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CustomHandlerRegistry.class); + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final AutoCloseableLock read = new AutoCloseableLock(readWriteLock.readLock()); + private final AutoCloseableLock write = new AutoCloseableLock(readWriteLock.writeLock()); + private final IntObjectOpenHashMap> handlers = new IntObjectOpenHashMap<>(); + private volatile DrillbitEndpoint endpoint; + + public CustomHandlerRegistry() { + } + + public void setEndpoint(DrillbitEndpoint endpoint) { + this.endpoint = endpoint; + } + + public void registerCustomHandler(int messageTypeId, + CustomMessageHandler handler, + Parser parser) { + Preconditions.checkNotNull(handler); + Preconditions.checkNotNull(parser); + try (AutoCloseableLock lock = write.open()) { + ParsingHandler parsingHandler = handlers.get(messageTypeId); + if (parsingHandler != null) { + throw new IllegalStateException(String.format( + "Only one handler can be registered for a given custom message type. You tried to register a handler for " + + "the %d message type but one had already been registered.", + messageTypeId)); + } + + parsingHandler = new ParsingHandler(handler, parser); + handlers.put(messageTypeId, parsingHandler); + } + } + + public Response handle(CustomMessage message, DrillBuf dBody) throws RpcException { + final ParsingHandler handler; + try (AutoCloseableLock lock = read.open()) { + handler = handlers.get(message.getType()); + } + + if (handler == null) { + throw new UserRpcException( + endpoint, "Unable to handle message.", + new IllegalStateException(String.format( + "Unable to handle message. The message type provided [%d] did not have a registered handler.", + message.getType()))); + } + final CustomResponse customResponse = handler.onMessage(message.getMessage(), dBody); + final CustomMessage responseMessage = CustomMessage.newBuilder() + .setMessage(customResponse.getMessage().toByteString()) + .setType(message.getType()) + .build(); + // make sure we don't pass in a null array. + final ByteBuf[] dBodies = customResponse.getBodies() == null ? new DrillBuf[0] : customResponse.getBodies(); + return new Response(RpcType.RESP_CUSTOM, responseMessage, dBodies); + + } + + private class ParsingHandler { + private final CustomMessageHandler handler; + private final Parser parser; + + public ParsingHandler(CustomMessageHandler handler, Parser parser) { + super(); + this.handler = handler; + this.parser = parser; + } + + public CustomResponse onMessage(ByteString pBody, DrillBuf dBody) throws UserRpcException { + + try { + final SEND message = parser.parseFrom(pBody); + return handler.onMessage(message, dBody); + + } catch (InvalidProtocolBufferException e) { + throw new UserRpcException(endpoint, "Failure parsing message.", e); + } + + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java index 10fe343..7065201 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.control; import org.apache.drill.exec.proto.BitControl.BitControlHandshake; import org.apache.drill.exec.proto.BitControl.BitStatus; +import org.apache.drill.exec.proto.BitControl.CustomMessage; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.RpcType; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; @@ -46,7 +47,8 @@ public class DefaultInstanceHandler { return BitStatus.getDefaultInstance(); case RpcType.RESP_QUERY_STATUS_VALUE: return QueryProfile.getDefaultInstance(); - + case RpcType.RESP_CUSTOM_VALUE: + return CustomMessage.getDefaultInstance(); default: throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java index 1c0eb80..77c069b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java @@ -19,8 +19,10 @@ package org.apache.drill.exec.work.batch; import static org.apache.drill.exec.rpc.RpcBus.get; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.proto.BitControl.CustomMessage; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.InitializeFragments; @@ -39,6 +41,7 @@ import org.apache.drill.exec.rpc.UserRpcException; import org.apache.drill.exec.rpc.control.ControlConnection; import org.apache.drill.exec.rpc.control.ControlRpcConfig; import org.apache.drill.exec.rpc.control.ControlTunnel; +import org.apache.drill.exec.rpc.control.CustomHandlerRegistry; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.foreman.Foreman; @@ -50,6 +53,7 @@ import org.apache.drill.exec.work.fragment.NonRootFragmentManager; public class ControlMessageHandler { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class); private final WorkerBee bee; + private final CustomHandlerRegistry handlerRegistry = new CustomHandlerRegistry(); public ControlMessageHandler(final WorkerBee bee) { this.bee = bee; @@ -69,6 +73,11 @@ public class ControlMessageHandler { return ControlRpcConfig.OK; } + case RpcType.REQ_CUSTOM_VALUE: { + final CustomMessage customMessage = get(pBody, CustomMessage.PARSER); + return handlerRegistry.handle(customMessage, (DrillBuf) dBody); + } + case RpcType.REQ_RECEIVER_FINISHED_VALUE: { final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER); receivingFragmentFinished(finishedReceiver); @@ -228,4 +237,9 @@ public class ControlMessageHandler { return Acks.OK; } + + public CustomHandlerRegistry getHandlerRegistry() { + return handlerRegistry; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java new file mode 100644 index 0000000..2008a48 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.internal.ThreadLocalRandom; + +import java.util.Arrays; +import java.util.Random; + +import org.apache.drill.BaseTestQuery; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserRpcException; +import org.apache.drill.exec.rpc.control.ControlTunnel.CustomFuture; +import org.apache.drill.exec.rpc.control.ControlTunnel.CustomTunnel; +import org.apache.drill.exec.rpc.control.Controller.CustomMessageHandler; +import org.apache.drill.exec.rpc.control.Controller.CustomResponse; +import org.apache.drill.exec.server.DrillbitContext; +import org.junit.Test; + +import com.google.protobuf.InvalidProtocolBufferException; + +public class TestCustomTunnel extends BaseTestQuery { + + private final QueryId expectedId = QueryId + .newBuilder() + .setPart1(ThreadLocalRandom.current().nextLong()) + .setPart2(ThreadLocalRandom.current().nextLong()) + .build(); + + private final ByteBuf buf1; + private final byte[] expected; + + public TestCustomTunnel() { + buf1 = UnpooledByteBufAllocator.DEFAULT.buffer(1024); + Random r = new Random(); + this.expected = new byte[1024]; + r.nextBytes(expected); + buf1.writeBytes(expected); + } + + @Test + public void ensureRoundTrip() throws RpcException, InvalidProtocolBufferException { + + final DrillbitContext context = getDrillbitContext(); + final TestCustomMessageHandler handler = new TestCustomMessageHandler(context.getEndpoint(), false); + context.getController().registerCustomHandler(1001, handler, DrillbitEndpoint.PARSER); + final ControlTunnel loopbackTunnel = context.getController().getTunnel(context.getEndpoint()); + final CustomTunnel tunnel = loopbackTunnel.getCustomTunnel(1001, DrillbitEndpoint.class, + QueryId.PARSER); + CustomFuture future = tunnel.send(context.getEndpoint()); + assertEquals(expectedId, future.get()); + } + + @Test + public void ensureRoundTripBytes() throws RpcException, InvalidProtocolBufferException { + final DrillbitContext context = getDrillbitContext(); + final TestCustomMessageHandler handler = new TestCustomMessageHandler(context.getEndpoint(), true); + context.getController().registerCustomHandler(1002, handler, DrillbitEndpoint.PARSER); + final ControlTunnel loopbackTunnel = context.getController().getTunnel(context.getEndpoint()); + final CustomTunnel tunnel = loopbackTunnel.getCustomTunnel(1002, DrillbitEndpoint.class, + QueryId.PARSER); + buf1.retain(); + CustomFuture future = tunnel.send(context.getEndpoint(), buf1); + assertEquals(expectedId, future.get()); + byte[] actual = new byte[1024]; + future.getBuffer().getBytes(0, actual); + future.getBuffer().release(); + assertTrue(Arrays.equals(expected, actual)); + } + + private class TestCustomMessageHandler implements CustomMessageHandler { + private DrillbitEndpoint expectedValue; + private final boolean returnBytes; + + public TestCustomMessageHandler(DrillbitEndpoint expectedValue, boolean returnBytes) { + super(); + this.expectedValue = expectedValue; + this.returnBytes = returnBytes; + } + + @Override + public CustomResponse onMessage(DrillbitEndpoint pBody, DrillBuf dBody) throws UserRpcException { + + if (!expectedValue.equals(pBody)) { + throw new UserRpcException(expectedValue, "Invalid expected downstream value.", new IllegalStateException()); + } + + if (returnBytes) { + byte[] actual = new byte[1024]; + dBody.getBytes(0, actual); + if (!Arrays.equals(expected, actual)) { + throw new UserRpcException(expectedValue, "Invalid expected downstream value.", new IllegalStateException()); + } + } + + return new CustomResponse() { + + @Override + public QueryId getMessage() { + return expectedId; + } + + @Override + public ByteBuf[] getBodies() { + if (returnBytes) { + buf1.retain(); + return new ByteBuf[] { buf1 }; + } else { + return null; + } + } + + }; + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java index 493036b..b16934d 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java @@ -99,25 +99,33 @@ public final class BitControl { */ REQ_UNPAUSE_FRAGMENT(10, 16), /** + * REQ_CUSTOM = 17; + */ + REQ_CUSTOM(11, 17), + /** * RESP_FRAGMENT_HANDLE = 11; * *
      * bit responses
      * 
*/ - RESP_FRAGMENT_HANDLE(11, 11), + RESP_FRAGMENT_HANDLE(12, 11), /** * RESP_FRAGMENT_STATUS = 12; */ - RESP_FRAGMENT_STATUS(12, 12), + RESP_FRAGMENT_STATUS(13, 12), /** * RESP_BIT_STATUS = 13; */ - RESP_BIT_STATUS(13, 13), + RESP_BIT_STATUS(14, 13), /** * RESP_QUERY_STATUS = 14; */ - RESP_QUERY_STATUS(14, 14), + RESP_QUERY_STATUS(15, 14), + /** + * RESP_CUSTOM = 18; + */ + RESP_CUSTOM(16, 18), ; /** @@ -185,6 +193,10 @@ public final class BitControl { */ public static final int REQ_UNPAUSE_FRAGMENT_VALUE = 16; /** + * REQ_CUSTOM = 17; + */ + public static final int REQ_CUSTOM_VALUE = 17; + /** * RESP_FRAGMENT_HANDLE = 11; * *
@@ -204,6 +216,10 @@ public final class BitControl {
      * RESP_QUERY_STATUS = 14;
      */
     public static final int RESP_QUERY_STATUS_VALUE = 14;
+    /**
+     * RESP_CUSTOM = 18;
+     */
+    public static final int RESP_CUSTOM_VALUE = 18;
 
 
     public final int getNumber() { return value; }
@@ -221,10 +237,12 @@ public final class BitControl {
         case 10: return REQ_QUERY_STATUS;
         case 15: return REQ_QUERY_CANCEL;
         case 16: return REQ_UNPAUSE_FRAGMENT;
+        case 17: return REQ_CUSTOM;
         case 11: return RESP_FRAGMENT_HANDLE;
         case 12: return RESP_FRAGMENT_STATUS;
         case 13: return RESP_BIT_STATUS;
         case 14: return RESP_QUERY_STATUS;
+        case 18: return RESP_CUSTOM;
         default: return null;
       }
     }
@@ -3001,6 +3019,486 @@ public final class BitControl {
     // @@protoc_insertion_point(class_scope:exec.bit.control.InitializeFragments)
   }
 
+  public interface CustomMessageOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int32 type = 1;
+    /**
+     * optional int32 type = 1;
+     */
+    boolean hasType();
+    /**
+     * optional int32 type = 1;
+     */
+    int getType();
+
+    // optional bytes message = 2;
+    /**
+     * optional bytes message = 2;
+     */
+    boolean hasMessage();
+    /**
+     * optional bytes message = 2;
+     */
+    com.google.protobuf.ByteString getMessage();
+  }
+  /**
+   * Protobuf type {@code exec.bit.control.CustomMessage}
+   */
+  public static final class CustomMessage extends
+      com.google.protobuf.GeneratedMessage
+      implements CustomMessageOrBuilder {
+    // Use CustomMessage.newBuilder() to construct.
+    private CustomMessage(com.google.protobuf.GeneratedMessage.Builder builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private CustomMessage(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final CustomMessage defaultInstance;
+    public static CustomMessage getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public CustomMessage getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private CustomMessage(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              type_ = input.readInt32();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              message_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.drill.exec.proto.BitControl.CustomMessage.class, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser PARSER =
+        new com.google.protobuf.AbstractParser() {
+      public CustomMessage parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new CustomMessage(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional int32 type = 1;
+    public static final int TYPE_FIELD_NUMBER = 1;
+    private int type_;
+    /**
+     * optional int32 type = 1;
+     */
+    public boolean hasType() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * optional int32 type = 1;
+     */
+    public int getType() {
+      return type_;
+    }
+
+    // optional bytes message = 2;
+    public static final int MESSAGE_FIELD_NUMBER = 2;
+    private com.google.protobuf.ByteString message_;
+    /**
+     * optional bytes message = 2;
+     */
+    public boolean hasMessage() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * optional bytes message = 2;
+     */
+    public com.google.protobuf.ByteString getMessage() {
+      return message_;
+    }
+
+    private void initFields() {
+      type_ = 0;
+      message_ = com.google.protobuf.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt32(1, type_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, message_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(1, type_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, message_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.drill.exec.proto.BitControl.CustomMessage parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.drill.exec.proto.BitControl.CustomMessage prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code exec.bit.control.CustomMessage}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder
+       implements org.apache.drill.exec.proto.BitControl.CustomMessageOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.drill.exec.proto.BitControl.CustomMessage.class, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder.class);
+      }
+
+      // Construct using org.apache.drill.exec.proto.BitControl.CustomMessage.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        type_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        message_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.drill.exec.proto.BitControl.internal_static_exec_bit_control_CustomMessage_descriptor;
+      }
+
+      public org.apache.drill.exec.proto.BitControl.CustomMessage getDefaultInstanceForType() {
+        return org.apache.drill.exec.proto.BitControl.CustomMessage.getDefaultInstance();
+      }
+
+      public org.apache.drill.exec.proto.BitControl.CustomMessage build() {
+        org.apache.drill.exec.proto.BitControl.CustomMessage result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.drill.exec.proto.BitControl.CustomMessage buildPartial() {
+        org.apache.drill.exec.proto.BitControl.CustomMessage result = new org.apache.drill.exec.proto.BitControl.CustomMessage(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.type_ = type_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.message_ = message_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.drill.exec.proto.BitControl.CustomMessage) {
+          return mergeFrom((org.apache.drill.exec.proto.BitControl.CustomMessage)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.drill.exec.proto.BitControl.CustomMessage other) {
+        if (other == org.apache.drill.exec.proto.BitControl.CustomMessage.getDefaultInstance()) return this;
+        if (other.hasType()) {
+          setType(other.getType());
+        }
+        if (other.hasMessage()) {
+          setMessage(other.getMessage());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.drill.exec.proto.BitControl.CustomMessage parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.drill.exec.proto.BitControl.CustomMessage) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional int32 type = 1;
+      private int type_ ;
+      /**
+       * optional int32 type = 1;
+       */
+      public boolean hasType() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * optional int32 type = 1;
+       */
+      public int getType() {
+        return type_;
+      }
+      /**
+       * optional int32 type = 1;
+       */
+      public Builder setType(int value) {
+        bitField0_ |= 0x00000001;
+        type_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * optional int32 type = 1;
+       */
+      public Builder clearType() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        type_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // optional bytes message = 2;
+      private com.google.protobuf.ByteString message_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * optional bytes message = 2;
+       */
+      public boolean hasMessage() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * optional bytes message = 2;
+       */
+      public com.google.protobuf.ByteString getMessage() {
+        return message_;
+      }
+      /**
+       * optional bytes message = 2;
+       */
+      public Builder setMessage(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        message_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * optional bytes message = 2;
+       */
+      public Builder clearMessage() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        message_ = getDefaultInstance().getMessage();
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:exec.bit.control.CustomMessage)
+    }
+
+    static {
+      defaultInstance = new CustomMessage(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:exec.bit.control.CustomMessage)
+  }
+
   public interface PlanFragmentOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
@@ -8440,6 +8938,11 @@ public final class BitControl {
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_exec_bit_control_InitializeFragments_fieldAccessorTable;
   private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_exec_bit_control_CustomMessage_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_exec_bit_control_CustomMessage_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
     internal_static_exec_bit_control_PlanFragment_descriptor;
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -8485,41 +8988,43 @@ public final class BitControl {
       "red.MinorFragmentProfile\022(\n\006handle\030\002 \001(\013",
       "2\030.exec.bit.FragmentHandle\"G\n\023Initialize" +
       "Fragments\0220\n\010fragment\030\001 \003(\0132\036.exec.bit.c" +
-      "ontrol.PlanFragment\"\374\003\n\014PlanFragment\022(\n\006" +
-      "handle\030\001 \001(\0132\030.exec.bit.FragmentHandle\022\024" +
-      "\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_cost\030\005 \001(\002\022\021" +
-      "\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_cost\030\007 \001(\002\022\025" +
-      "\n\rfragment_json\030\010 \001(\t\022\025\n\rleaf_fragment\030\t" +
-      " \001(\010\022*\n\nassignment\030\n \001(\0132\026.exec.Drillbit" +
-      "Endpoint\022\'\n\007foreman\030\013 \001(\0132\026.exec.Drillbi" +
-      "tEndpoint\022\035\n\013mem_initial\030\014 \001(\003:\01020000000",
-      "\022\033\n\007mem_max\030\r \001(\003:\n2000000000\0221\n\013credent" +
-      "ials\030\016 \001(\0132\034.exec.shared.UserCredentials" +
-      "\022\024\n\014options_json\030\017 \001(\t\022:\n\007context\030\020 \001(\0132" +
-      ").exec.bit.control.QueryContextInformati" +
-      "on\022.\n\tcollector\030\021 \003(\0132\033.exec.bit.control" +
-      ".Collector\"\210\001\n\tCollector\022\"\n\032opposite_maj" +
-      "or_fragment_id\030\001 \001(\005\022#\n\027incoming_minor_f" +
-      "ragment\030\002 \003(\005B\002\020\001\022\035\n\025supports_out_of_ord" +
-      "er\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\"c\n\027QueryCo" +
-      "ntextInformation\022\030\n\020query_start_time\030\001 \001",
-      "(\003\022\021\n\ttime_zone\030\002 \001(\005\022\033\n\023default_schema_" +
-      "name\030\003 \001(\t\"f\n\017WorkQueueStatus\022(\n\010endpoin" +
-      "t\030\001 \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014queue" +
-      "_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001(\003\"h\n\020Fi" +
-      "nishedReceiver\022*\n\010receiver\030\001 \001(\0132\030.exec." +
-      "bit.FragmentHandle\022(\n\006sender\030\002 \001(\0132\030.exe" +
-      "c.bit.FragmentHandle*\323\002\n\007RpcType\022\r\n\tHAND" +
-      "SHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\034\n\030REQ_INI" +
-      "TIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_CANCEL_FRAGME" +
-      "NT\020\006\022\031\n\025REQ_RECEIVER_FINISHED\020\007\022\027\n\023REQ_F",
-      "RAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_STATUS\020\t\022\024\n\020" +
-      "REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QUERY_CANCEL\020\017" +
-      "\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\030\n\024RESP_FRAGM" +
-      "ENT_HANDLE\020\013\022\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023" +
-      "\n\017RESP_BIT_STATUS\020\r\022\025\n\021RESP_QUERY_STATUS" +
-      "\020\016B+\n\033org.apache.drill.exec.protoB\nBitCo" +
-      "ntrolH\001"
+      "ontrol.PlanFragment\".\n\rCustomMessage\022\014\n\004" +
+      "type\030\001 \001(\005\022\017\n\007message\030\002 \001(\014\"\374\003\n\014PlanFrag" +
+      "ment\022(\n\006handle\030\001 \001(\0132\030.exec.bit.Fragment" +
+      "Handle\022\024\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_cost" +
+      "\030\005 \001(\002\022\021\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_cost" +
+      "\030\007 \001(\002\022\025\n\rfragment_json\030\010 \001(\t\022\025\n\rleaf_fr" +
+      "agment\030\t \001(\010\022*\n\nassignment\030\n \001(\0132\026.exec." +
+      "DrillbitEndpoint\022\'\n\007foreman\030\013 \001(\0132\026.exec",
+      ".DrillbitEndpoint\022\035\n\013mem_initial\030\014 \001(\003:\010" +
+      "20000000\022\033\n\007mem_max\030\r \001(\003:\n2000000000\0221\n" +
+      "\013credentials\030\016 \001(\0132\034.exec.shared.UserCre" +
+      "dentials\022\024\n\014options_json\030\017 \001(\t\022:\n\007contex" +
+      "t\030\020 \001(\0132).exec.bit.control.QueryContextI" +
+      "nformation\022.\n\tcollector\030\021 \003(\0132\033.exec.bit" +
+      ".control.Collector\"\210\001\n\tCollector\022\"\n\032oppo" +
+      "site_major_fragment_id\030\001 \001(\005\022#\n\027incoming" +
+      "_minor_fragment\030\002 \003(\005B\002\020\001\022\035\n\025supports_ou" +
+      "t_of_order\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\"c\n",
+      "\027QueryContextInformation\022\030\n\020query_start_" +
+      "time\030\001 \001(\003\022\021\n\ttime_zone\030\002 \001(\005\022\033\n\023default" +
+      "_schema_name\030\003 \001(\t\"f\n\017WorkQueueStatus\022(\n" +
+      "\010endpoint\030\001 \001(\0132\026.exec.DrillbitEndpoint\022" +
+      "\024\n\014queue_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001" +
+      "(\003\"h\n\020FinishedReceiver\022*\n\010receiver\030\001 \001(\013" +
+      "2\030.exec.bit.FragmentHandle\022(\n\006sender\030\002 \001" +
+      "(\0132\030.exec.bit.FragmentHandle*\364\002\n\007RpcType" +
+      "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\034\n" +
+      "\030REQ_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_CANCE",
+      "L_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISHED\020\007\022" +
+      "\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_STAT" +
+      "US\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QUERY_" +
+      "CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\016\n\nRE" +
+      "Q_CUSTOM\020\021\022\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n\024" +
+      "RESP_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_STATU" +
+      "S\020\r\022\025\n\021RESP_QUERY_STATUS\020\016\022\017\n\013RESP_CUSTO" +
+      "M\020\022B+\n\033org.apache.drill.exec.protoB\nBitC" +
+      "ontrolH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -8550,32 +9055,38 @@ public final class BitControl {
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_InitializeFragments_descriptor,
               new java.lang.String[] { "Fragment", });
-          internal_static_exec_bit_control_PlanFragment_descriptor =
+          internal_static_exec_bit_control_CustomMessage_descriptor =
             getDescriptor().getMessageTypes().get(4);
+          internal_static_exec_bit_control_CustomMessage_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_exec_bit_control_CustomMessage_descriptor,
+              new java.lang.String[] { "Type", "Message", });
+          internal_static_exec_bit_control_PlanFragment_descriptor =
+            getDescriptor().getMessageTypes().get(5);
           internal_static_exec_bit_control_PlanFragment_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_PlanFragment_descriptor,
               new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "LeafFragment", "Assignment", "Foreman", "MemInitial", "MemMax", "Credentials", "OptionsJson", "Context", "Collector", });
           internal_static_exec_bit_control_Collector_descriptor =
-            getDescriptor().getMessageTypes().get(5);
+            getDescriptor().getMessageTypes().get(6);
           internal_static_exec_bit_control_Collector_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_Collector_descriptor,
               new java.lang.String[] { "OppositeMajorFragmentId", "IncomingMinorFragment", "SupportsOutOfOrder", "IsSpooling", });
           internal_static_exec_bit_control_QueryContextInformation_descriptor =
-            getDescriptor().getMessageTypes().get(6);
+            getDescriptor().getMessageTypes().get(7);
           internal_static_exec_bit_control_QueryContextInformation_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_QueryContextInformation_descriptor,
               new java.lang.String[] { "QueryStartTime", "TimeZone", "DefaultSchemaName", });
           internal_static_exec_bit_control_WorkQueueStatus_descriptor =
-            getDescriptor().getMessageTypes().get(7);
+            getDescriptor().getMessageTypes().get(8);
           internal_static_exec_bit_control_WorkQueueStatus_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_WorkQueueStatus_descriptor,
               new java.lang.String[] { "Endpoint", "QueueLength", "ReportTime", });
           internal_static_exec_bit_control_FinishedReceiver_descriptor =
-            getDescriptor().getMessageTypes().get(8);
+            getDescriptor().getMessageTypes().get(9);
           internal_static_exec_bit_control_FinishedReceiver_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_FinishedReceiver_descriptor,

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
index a4088c9..0a9b90d 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java
@@ -499,6 +499,125 @@ public final class SchemaBitControl
         }
     }
 
+    public static final class CustomMessage
+    {
+        public static final org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.MessageSchema WRITE =
+            new org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.MessageSchema();
+        public static final org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.BuilderSchema MERGE =
+            new org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.BuilderSchema();
+        
+        public static class MessageSchema implements com.dyuproject.protostuff.Schema
+        {
+            public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.exec.proto.BitControl.CustomMessage message) throws java.io.IOException
+            {
+                if(message.hasType())
+                    output.writeInt32(1, message.getType(), false);
+                if(message.hasMessage())
+                    output.writeByteArray(2, message.getMessage().toByteArray(), false);
+
+            }
+            public boolean isInitialized(org.apache.drill.exec.proto.BitControl.CustomMessage message)
+            {
+                return message.isInitialized();
+            }
+            public java.lang.String getFieldName(int number)
+            {
+                return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldName(number);
+            }
+            public int getFieldNumber(java.lang.String name)
+            {
+                return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldNumber(name);
+            }
+            public java.lang.Class typeClass()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.class;
+            }
+            public java.lang.String messageName()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getSimpleName();
+            }
+            public java.lang.String messageFullName()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getName();
+            }
+            //unused
+            public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.exec.proto.BitControl.CustomMessage message) throws java.io.IOException {}
+            public org.apache.drill.exec.proto.BitControl.CustomMessage newMessage() { return null; }
+        }
+        public static class BuilderSchema implements com.dyuproject.protostuff.Schema
+        {
+            public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder builder) throws java.io.IOException
+            {
+                for(int number = input.readFieldNumber(this);; number = input.readFieldNumber(this))
+                {
+                    switch(number)
+                    {
+                        case 0:
+                            return;
+                        case 1:
+                            builder.setType(input.readInt32());
+                            break;
+                        case 2:
+                            builder.setMessage(com.google.protobuf.ByteString.copyFrom(input.readByteArray()));
+                            break;
+                        default:
+                            input.handleUnknownField(number, this);
+                    }
+                }
+            }
+            public boolean isInitialized(org.apache.drill.exec.proto.BitControl.CustomMessage.Builder builder)
+            {
+                return builder.isInitialized();
+            }
+            public org.apache.drill.exec.proto.BitControl.CustomMessage.Builder newMessage()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.newBuilder();
+            }
+            public java.lang.String getFieldName(int number)
+            {
+                return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldName(number);
+            }
+            public int getFieldNumber(java.lang.String name)
+            {
+                return org.apache.drill.exec.proto.SchemaBitControl.CustomMessage.getFieldNumber(name);
+            }
+            public java.lang.Class typeClass()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.Builder.class;
+            }
+            public java.lang.String messageName()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getSimpleName();
+            }
+            public java.lang.String messageFullName()
+            {
+                return org.apache.drill.exec.proto.BitControl.CustomMessage.class.getName();
+            }
+            //unused
+            public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.exec.proto.BitControl.CustomMessage.Builder builder) throws java.io.IOException {}
+        }
+        public static java.lang.String getFieldName(int number)
+        {
+            switch(number)
+            {
+                case 1: return "type";
+                case 2: return "message";
+                default: return null;
+            }
+        }
+        public static int getFieldNumber(java.lang.String name)
+        {
+            java.lang.Integer number = fieldMap.get(name);
+            return number == null ? 0 : number.intValue();
+        }
+        private static final java.util.HashMap fieldMap = new java.util.HashMap();
+        static
+        {
+            fieldMap.put("type", 1);
+            fieldMap.put("message", 2);
+        }
+    }
+
     public static final class PlanFragment
     {
         public static final org.apache.drill.exec.proto.SchemaBitControl.PlanFragment.MessageSchema WRITE =

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java
new file mode 100644
index 0000000..af9195e
--- /dev/null
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CustomMessage.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Generated by http://code.google.com/p/protostuff/ ... DO NOT EDIT!
+// Generated from protobuf
+
+package org.apache.drill.exec.proto.beans;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import com.dyuproject.protostuff.ByteString;
+import com.dyuproject.protostuff.GraphIOUtil;
+import com.dyuproject.protostuff.Input;
+import com.dyuproject.protostuff.Message;
+import com.dyuproject.protostuff.Output;
+import com.dyuproject.protostuff.Schema;
+
+public final class CustomMessage implements Externalizable, Message, Schema
+{
+
+    public static Schema getSchema()
+    {
+        return DEFAULT_INSTANCE;
+    }
+
+    public static CustomMessage getDefaultInstance()
+    {
+        return DEFAULT_INSTANCE;
+    }
+
+    static final CustomMessage DEFAULT_INSTANCE = new CustomMessage();
+
+    
+    private int type;
+    private ByteString message;
+
+    public CustomMessage()
+    {
+        
+    }
+
+    // getters and setters
+
+    // type
+
+    public int getType()
+    {
+        return type;
+    }
+
+    public CustomMessage setType(int type)
+    {
+        this.type = type;
+        return this;
+    }
+
+    // message
+
+    public ByteString getMessage()
+    {
+        return message;
+    }
+
+    public CustomMessage setMessage(ByteString message)
+    {
+        this.message = message;
+        return this;
+    }
+
+    // java serialization
+
+    public void readExternal(ObjectInput in) throws IOException
+    {
+        GraphIOUtil.mergeDelimitedFrom(in, this, this);
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException
+    {
+        GraphIOUtil.writeDelimitedTo(out, this, this);
+    }
+
+    // message method
+
+    public Schema cachedSchema()
+    {
+        return DEFAULT_INSTANCE;
+    }
+
+    // schema methods
+
+    public CustomMessage newMessage()
+    {
+        return new CustomMessage();
+    }
+
+    public Class typeClass()
+    {
+        return CustomMessage.class;
+    }
+
+    public String messageName()
+    {
+        return CustomMessage.class.getSimpleName();
+    }
+
+    public String messageFullName()
+    {
+        return CustomMessage.class.getName();
+    }
+
+    public boolean isInitialized(CustomMessage message)
+    {
+        return true;
+    }
+
+    public void mergeFrom(Input input, CustomMessage message) throws IOException
+    {
+        for(int number = input.readFieldNumber(this);; number = input.readFieldNumber(this))
+        {
+            switch(number)
+            {
+                case 0:
+                    return;
+                case 1:
+                    message.type = input.readInt32();
+                    break;
+                case 2:
+                    message.message = input.readBytes();
+                    break;
+                default:
+                    input.handleUnknownField(number, this);
+            }   
+        }
+    }
+
+
+    public void writeTo(Output output, CustomMessage message) throws IOException
+    {
+        if(message.type != 0)
+            output.writeInt32(1, message.type, false);
+
+        if(message.message != null)
+            output.writeBytes(2, message.message, false);
+    }
+
+    public String getFieldName(int number)
+    {
+        switch(number)
+        {
+            case 1: return "type";
+            case 2: return "message";
+            default: return null;
+        }
+    }
+
+    public int getFieldNumber(String name)
+    {
+        final Integer number = __fieldMap.get(name);
+        return number == null ? 0 : number.intValue();
+    }
+
+    private static final java.util.HashMap __fieldMap = new java.util.HashMap();
+    static
+    {
+        __fieldMap.put("type", 1);
+        __fieldMap.put("message", 2);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/67d5cc6d/protocol/src/main/protobuf/BitControl.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto
index 6d1b529..ca441f7 100644
--- a/protocol/src/main/protobuf/BitControl.proto
+++ b/protocol/src/main/protobuf/BitControl.proto
@@ -25,12 +25,14 @@ enum RpcType {
   REQ_QUERY_STATUS = 10;
   REQ_QUERY_CANCEL = 15;
   REQ_UNPAUSE_FRAGMENT = 16; // send a resume message for a fragment, returns Ack
+  REQ_CUSTOM = 17;
 
   // bit responses
   RESP_FRAGMENT_HANDLE = 11;
   RESP_FRAGMENT_STATUS = 12;
   RESP_BIT_STATUS = 13;
   RESP_QUERY_STATUS = 14;
+  RESP_CUSTOM = 18;
 }
 
 message BitControlHandshake{
@@ -52,6 +54,11 @@ message InitializeFragments {
   repeated PlanFragment fragment = 1;
 }
 
+message CustomMessage {
+  optional int32 type = 1;
+  optional bytes message = 2;
+}
+
 message PlanFragment {
   optional FragmentHandle handle = 1;
   optional float network_cost = 4;