drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [9/9] drill git commit: DRILL-2573: C++ Client - Separate QueryResult into QueryResult and QueryData
Date Sat, 04 Apr 2015 02:37:35 GMT
DRILL-2573: C++ Client - Separate QueryResult into QueryResult and QueryData


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4f213570
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4f213570
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4f213570

Branch: refs/heads/master
Commit: 4f213570f29a30c8609afacba0ca01cc33cdc7d0
Parents: 6a8e0ec
Author: Parth Chandra <pchandra@maprtech.com>
Authored: Fri Mar 27 11:21:07 2015 -0700
Committer: Parth Chandra <pchandra@maprtech.com>
Committed: Fri Apr 3 18:40:53 2015 -0700

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    |  20 +-
 .../client/src/clientlib/drillClientImpl.cpp    | 372 ++++----
 .../client/src/clientlib/drillClientImpl.hpp    |  19 +-
 .../native/client/src/clientlib/recordBatch.cpp |   4 +-
 .../native/client/src/include/drill/common.hpp  |   2 +-
 .../client/src/include/drill/drillClient.hpp    |   4 +-
 .../client/src/include/drill/recordBatch.hpp    |   9 +-
 .../native/client/src/protobuf/BitData.pb.cc    | 289 ++++---
 contrib/native/client/src/protobuf/BitData.pb.h | 256 +++---
 contrib/native/client/src/protobuf/User.pb.cc   |  13 +-
 contrib/native/client/src/protobuf/User.pb.h    |   7 +-
 .../client/src/protobuf/UserBitShared.pb.cc     | 855 +++++++++----------
 .../client/src/protobuf/UserBitShared.pb.h      | 477 +++++------
 13 files changed, 1171 insertions(+), 1156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index bef64bf..85e89e0 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -71,15 +71,18 @@ Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::Dr
     // or
     // (received query state message passed by `err` and b is NULL)
     if(!err){
-        assert(b!=NULL);
-        b->print(std::cout, 0); // print all rows
-        std::cout << "DATA RECEIVED ..." << std::endl;
-        delete b; // we're done with this batch, we can delete it
-        if(bTestCancel){
-            return Drill::QRY_FAILURE;
+        if(b!=NULL){
+            b->print(std::cout, 0); // print all rows
+            std::cout << "DATA RECEIVED ..." << std::endl;
+            delete b; // we're done with this batch, we can delete it
+            if(bTestCancel){
+                return Drill::QRY_FAILURE;
+            }else{
+                return Drill::QRY_SUCCESS ;
+            }
         }else{
-            return Drill::QRY_SUCCESS ;
-        }
+            std::cout << "Query Complete." << std::endl;
+		}
     }else{
         assert(b==NULL);
         switch(err->status) {
@@ -392,6 +395,7 @@ int main(int argc, char* argv[]) {
                 }
                 client.freeQueryIterator(&pRecIter);
             }
+            client.waitForResults();
         }else{
             if(bSyncSend){
                 for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {

http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 71f960e..dce5bdc 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -53,7 +53,7 @@ static std::map<exec::shared::QueryResult_QueryState, status_t> QUERYSTATE_TO_ST
     (exec::shared::QueryResult_QueryState_COMPLETED, QRY_COMPLETED)
     (exec::shared::QueryResult_QueryState_CANCELED, QRY_CANCELED)
     (exec::shared::QueryResult_QueryState_FAILED, QRY_FAILED)
-    (exec::shared::QueryResult_QueryState_UNKNOWN_QUERY, QRY_UNKNOWN_QUERY);
+    ;
 
 RpcEncoder DrillClientImpl::s_encoder;
 RpcDecoder DrillClientImpl::s_decoder;
@@ -325,7 +325,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
         return ret;
     }
     if(m_handshakeVersion != u2b.rpc_version()) {
-        DRILL_LOG(LOG_TRACE) << "Invalid rpc version.  Expected << "
+        DRILL_LOG(LOG_TRACE) << "Invalid rpc version.  Expected "
             << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;
         return handleConnError(CONN_HANDSHAKE_FAILED,
                 getMessage(ERR_CONN_NOHSHAKE, DRILL_RPC_VERSION, m_handshakeVersion));
@@ -510,7 +510,59 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
 status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer, InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     status_t ret=QRY_SUCCESS;
-	exec::shared::QueryId qid;
+    exec::shared::QueryId qid;
+    sendAck(msg, true);
+    {
+        boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+        exec::shared::QueryResult qr;
+
+        DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;
+        qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
+        DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;
+        
+        qid.CopyFrom(qr.query_id());
+        
+        if (qr.has_query_state() &&
+                qr.query_state() != exec::shared::QueryResult_QueryState_RUNNING &&
+                qr.query_state() != exec::shared::QueryResult_QueryState_PENDING) {
+            pDrillClientQueryResult=findQueryResult(qid);
+            //Queries that have been cancelled or whose resources are freed before completion 
+            //do not have a DrillClientQueryResult object. We need not handle the terminal message 
+            //in that case since all it does is to free resources (and they have already been freed)
+            if(pDrillClientQueryResult!=NULL){
+                //Validate the RPC message
+                std::string valErr;
+                if( (ret=validateResultMessage(msg, qr, valErr)) != QRY_SUCCESS){
+                    delete allocatedBuffer;
+                    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;
+                    return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
+                }
+                ret=processQueryStatusResult(&qr, pDrillClientQueryResult);
+            }else{
+                // We've received the final message for a query that has been cancelled
+                // or for which the resources have been freed. We no longer need to listen
+                // for more incoming messages for such a query.
+                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;
+                m_pendingRequests--;
+                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;
+                ret=QRY_CANCELED;
+            }
+            delete allocatedBuffer;
+            return ret;
+        }else{
+            // Normal query results come back with query_state not set.
+            // Actually this is not strictly true. The query state is set to
+            // 0(i.e. PENDING), but protobuf thinks this means the value is not set.
+            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";
+        }
+    }
+    return ret;
+}
+
+status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer, InBoundRpcMessage& msg ){
+    DrillClientQueryResult* pDrillClientQueryResult=NULL;
+    status_t ret=QRY_SUCCESS;
+    exec::shared::QueryId qid;
     // Be a good client and send ack as early as possible.
     // Drillbit pushed the query result to the client, the client should send ack
     // whenever it receives the message
@@ -518,62 +570,34 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
     RecordBatch* pRecordBatch=NULL;
     {
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-        exec::shared::QueryResult* qr = new exec::shared::QueryResult; //Record Batch will own this object and free it up.
+        exec::shared::QueryData* qr = new exec::shared::QueryData; //Record Batch will own this object and free it up.
 
-        DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;
+        DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;
         qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
         DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;
 
-        DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;
-
         qid.CopyFrom(qr->query_id());
-        std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
-        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;
-        if(m_queryResults.size() != 0){
-            for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){
-                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
-                    << it->first->part2() << "]\n";
-            }
-        }
         if(qid.part1()==0){
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: QID=0. Ignore and return QRY_SUCCESS." << std::endl;
-            return QRY_SUCCESS;
-        }
-        it=this->m_queryResults.find(&qid);
-        if(it!=this->m_queryResults.end()){
-            pDrillClientQueryResult=(*it).second;
-        }else{
-            ret=processCancelledQueryResult(qid, qr);
-            DRILL_LOG(LOG_TRACE) << "Cleaning up resource allocated for canceled quyery." << std::endl;
-            delete qr;
+            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;
             delete allocatedBuffer;
-            return ret;
+            return QRY_SUCCESS;
         }
-        DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
-            debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;
 
-        // Drillbit may send a query state change message which does not contain any
-        // record batch.
-        if (qr->has_query_state() &&
-                qr->query_state() != exec::shared::QueryResult_QueryState_RUNNING &&
-                qr->query_state() != exec::shared::QueryResult_QueryState_PENDING) {
-            ret=processQueryStatusResult(qr, pDrillClientQueryResult);
-            delete allocatedBuffer;
+        pDrillClientQueryResult=findQueryResult(qid);
+        if(pDrillClientQueryResult==NULL){
+            DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" 
+                                 << debugPrintQid(qid) << ")." << std::endl;
             delete qr;
+            delete allocatedBuffer;
             return ret;
-        }else{
-            // Normal query results come back with query_state not set.
-            // Actually this is not strictly true. The query state is set to
-            // 0(i.e. PENDING), but protobuf thinks this means the value is not set.
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";
         }
-
+        
         //Validate the RPC message
         std::string valErr;
-        if( (ret=validateMessage(msg, *qr, valErr)) != QRY_SUCCESS){
+        if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
             delete allocatedBuffer;
             delete qr;
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC.\n";
+            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";
             pDrillClientQueryResult->setQueryStatus(ret);
             return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
         }
@@ -590,16 +614,13 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
             << pRecordBatch->getNumRecords()  << std::endl;
         DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
             << pRecordBatch->getNumFields()  << std::endl;
-        DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.isLastChunk "
-            << pRecordBatch->isLastChunk()  << std::endl;
 
         ret=pDrillClientQueryResult->setupColumnDefs(qr);
         if(ret==QRY_SUCCESS_WITH_INFO){
             pRecordBatch->schemaChanged(true);
         }
 
-        pDrillClientQueryResult->m_bIsQueryPending=true;
-        pDrillClientQueryResult->m_bIsLastChunk=qr->is_last_chunk();
+        pDrillClientQueryResult->setIsQueryPending(true);
         pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener;
         if(pDrillClientQueryResult->m_bIsLastChunk){
             DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
@@ -619,7 +640,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
         sendCancel(&qid);
         // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are
         // pushed on the wire before the cancel is processed.
-        pDrillClientQueryResult->m_bIsQueryPending=false;
+        pDrillClientQueryResult->setIsQueryPending(false);
         DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;
         pDrillClientQueryResult->setQueryStatus(ret);
         clearMapEntries(pDrillClientQueryResult);
@@ -628,42 +649,6 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
     return ret;
 }
 
-status_t DrillClientImpl::processCancelledQueryResult(exec::shared::QueryId& qid, exec::shared::QueryResult* qr){
-    status_t ret=QRY_SUCCESS;
-    // look in cancelled queries
-    DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(qr->query_id()) << " has been cancelled." << std::endl;
-    std::set<exec::shared::QueryId*, compareQueryId>::iterator it2;
-    exec::shared::QueryId* pQid=NULL;//
-    it2=this->m_cancelledQueries.find(&qid);
-    if(it2!=this->m_cancelledQueries.end()){
-        pQid=(*it2);
-        if(qr->has_query_state()){
-            ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()];
-            if(qr->query_state()==exec::shared::QueryResult_QueryState_COMPLETED
-                    || qr->query_state()==exec::shared::QueryResult_QueryState_CANCELED
-                    || qr->query_state()==exec::shared::QueryResult_QueryState_FAILED) {
-                this->m_pendingRequests--;
-                this->m_cancelledQueries.erase(it2);
-                delete pQid;
-                DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(qr->query_id()) << " completed." << std::endl;
-                DRILL_LOG(LOG_DEBUG) << "Pending requests - " << this->m_pendingRequests << std::endl;
-            }
-        }
-    }else{
-        status_t ret=QRY_FAILED;
-        if(qr->has_query_state() && qr->query_state()==exec::shared::QueryResult_QueryState_COMPLETED){
-            ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()];
-        }else if(!qr->has_query_state() && qr->row_count()==0){
-            ret=QRY_SUCCESS;
-        }else{
-            //ret= handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_OUTOFORDER), NULL);
-            DRILL_LOG(LOG_DEBUG) << "Pending requests - " << getMessage(ERR_QRY_OUTOFORDER) << std::endl;
-            ret= QRY_SUCCESS;
-        }
-    }
-    return ret;
-}
-
 status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;
@@ -698,51 +683,72 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
     return ret;
 }
 
+DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){
+    DrillClientQueryResult* pDrillClientQueryResult=NULL;
+    DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;
+    std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
+    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;
+    if(m_queryResults.size() != 0){
+        for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){
+            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
+                << it->first->part2() << "]\n";
+        }
+    }
+    it=this->m_queryResults.find(&qid);
+    if(it!=this->m_queryResults.end()){
+        pDrillClientQueryResult=(*it).second;
+        DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
+            debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;
+    }
+    return pDrillClientQueryResult;
+}
+
 status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr,
         DrillClientQueryResult* pDrillClientQueryResult){
-        status_t ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()];
+    status_t ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()];
+    if(pDrillClientQueryResult!=NULL){
         pDrillClientQueryResult->setQueryStatus(ret);
         pDrillClientQueryResult->setQueryState(qr->query_state());
-        switch(qr->query_state()) {
-            case exec::shared::QueryResult_QueryState_FAILED:
-            case exec::shared::QueryResult_QueryState_UNKNOWN_QUERY:
-                {
-                    // get the error message from protobuf and handle errors
-                    ret=handleQryError(ret, qr->error(0), pDrillClientQueryResult);
-                }
-                break;
-                // m_pendingRequests should be decremented when the query is
-                // completed
-            case exec::shared::QueryResult_QueryState_CANCELED:
-                {
-                    ret=handleTerminatedQryState(ret,
-                            getMessage(ERR_QRY_CANCELED),
-                            pDrillClientQueryResult);
-                    m_pendingRequests--;
-                }
-                break;
-            case exec::shared::QueryResult_QueryState_COMPLETED:
-                {
-                    //Not clean to call the handleTerminatedQryState method
-                    //because it signals an error to the listener.
-                    //The ODBC driver expects this though and the sync API
-                    //handles this (luckily).
-                    ret=handleTerminatedQryState(ret,
-                            getMessage(ERR_QRY_COMPLETED),
-                            pDrillClientQueryResult);
-                    m_pendingRequests--;
-                }
-                break;
-            default:
-                {
-                    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Unknown Query State.\n";
-                    ret=handleQryError(QRY_INTERNAL_ERROR,
-                            getMessage(ERR_QRY_UNKQRYSTATE),
-                            pDrillClientQueryResult);
-                }
-                break;
-        }
-        return ret;
+    }
+    switch(qr->query_state()) {
+        case exec::shared::QueryResult_QueryState_FAILED:
+            {
+                // get the error message from protobuf and handle errors
+                ret=handleQryError(ret, qr->error(0), pDrillClientQueryResult);
+            }
+            break;
+            // m_pendingRequests should be decremented when the query is
+            // completed
+        case exec::shared::QueryResult_QueryState_CANCELED:
+            {
+                ret=handleTerminatedQryState(ret,
+                        getMessage(ERR_QRY_CANCELED),
+                        pDrillClientQueryResult);
+                m_pendingRequests--;
+            }
+            break;
+        case exec::shared::QueryResult_QueryState_COMPLETED:
+            {
+                //Not clean to call the handleTerminatedQryState method
+                //because it signals an error to the listener.
+                //The ODBC driver expects this though and the sync API
+                //handles this (luckily).
+                ret=handleTerminatedQryState(ret,
+                        getMessage(ERR_QRY_COMPLETED),
+                        pDrillClientQueryResult);
+                m_pendingRequests--;
+            }
+            break;
+        default:
+            {
+                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";
+                ret=handleQryError(QRY_INTERNAL_ERROR,
+                        getMessage(ERR_QRY_UNKQRYSTATE),
+                        pDrillClientQueryResult);
+            }
+            break;
+    }
+    return ret;
 }
 
 void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
@@ -809,6 +815,14 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
                 }
                 return;
             }
+        }else if(!error && msg.m_rpc_type==exec::user::QUERY_DATA){
+            if(processQueryData(allocatedBuffer, msg)!=QRY_SUCCESS){
+                if(m_pendingRequests!=0){
+                    boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+                    getNextResult();
+                }
+                return;
+            }
         }else if(!error && msg.m_rpc_type==exec::user::QUERY_HANDLE){
             if(processQueryId(allocatedBuffer, msg)!=QRY_SUCCESS){
                 if(m_pendingRequests!=0){
@@ -820,6 +834,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         }else if(!error && msg.m_rpc_type==exec::user::ACK){
             // Cancel requests will result in an ACK sent back.
             // Consume silently
+            delete allocatedBuffer;
             if(m_pendingRequests!=0){
                 boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
                 getNextResult();
@@ -830,6 +845,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
             if(error){
                 // We have a socket read error, but we do not know which query this is for.
                 // Signal ALL pending queries that they should stop waiting.
+                delete allocatedBuffer;
                 DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;
                 handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
                 return;
@@ -851,10 +867,11 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
                         DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";
                     }
                 }else{
-                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+                    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
                         << "QueryResult returned " << msg.m_rpc_type << std::endl;
-                handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
+                    handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
                 }
+                delete allocatedBuffer;
                 return;
             }
         }
@@ -874,21 +891,25 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
     return;
 }
 
-status_t DrillClientImpl::validateMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valErr){
+status_t DrillClientImpl::validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valErr){
     if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){
         valErr=getMessage(ERR_QRY_RESPFAIL);
         return QRY_FAILURE;
     }
-    if(qr.query_state()== exec::shared::QueryResult_QueryState_UNKNOWN_QUERY){
-        valErr=getMessage(ERR_QRY_UNKQRY);
+    if(qd.def().carries_two_byte_selection_vector() == true){
+        valErr=getMessage(ERR_QRY_SELVEC2);
         return QRY_FAILURE;
     }
-    if(qr.query_state()== exec::shared::QueryResult_QueryState_CANCELED){
-        valErr=getMessage(ERR_QRY_CANCELED);
+    return QRY_SUCCESS;
+}
+
+status_t DrillClientImpl::validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valErr){
+    if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){
+        valErr=getMessage(ERR_QRY_RESPFAIL);
         return QRY_FAILURE;
     }
-    if(qr.def().carries_two_byte_selection_vector() == true){
-        valErr=getMessage(ERR_QRY_SELVEC2);
+    if(qr.query_state()==exec::shared::QueryResult_QueryState_CANCELED){
+        valErr=getMessage(ERR_QRY_CANCELED);
         return QRY_FAILURE;
     }
     return QRY_SUCCESS;
@@ -948,7 +969,9 @@ status_t DrillClientImpl::handleTerminatedQryState(
         std::string msg,
         DrillClientQueryResult* pQueryResult){
     assert(pQueryResult!=NULL);
-    if(status!=QRY_COMPLETED){
+    if(status==QRY_COMPLETED){
+        pQueryResult->signalComplete();
+    }else{
         // set query error only if queries did not complete successfully
         DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
         pQueryResult->signalError(pErr);
@@ -957,18 +980,6 @@ status_t DrillClientImpl::handleTerminatedQryState(
 }
 
 
-void DrillClientImpl::clearCancelledEntries(){
-
-    std::map<int, DrillClientQueryResult*>::iterator iter;
-    boost::lock_guard<boost::mutex> lock(m_dcMutex);
-
-    if(!m_cancelledQueries.empty()){
-        std::set<exec::shared::QueryId*, compareQueryId>::iterator it;
-        m_cancelledQueries.erase(m_cancelledQueries.begin(), m_cancelledQueries.end());
-    }
-}
-
-
 void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
     std::map<int, DrillClientQueryResult*>::iterator iter;
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
@@ -981,13 +992,6 @@ void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
         }
     }
     if(!m_queryResults.empty()){
-        // Save the query id and state and free when the query is complete
-        if(pQueryResult->getQueryState()!=exec::shared::QueryResult_QueryState_COMPLETED
-                && pQueryResult->getQueryState()!=exec::shared::QueryResult_QueryState_FAILED){
-            exec::shared::QueryId* pQueryId=new exec::shared::QueryId();
-            pQueryId->CopyFrom(pQueryResult->getQueryId());
-            m_cancelledQueries.insert(pQueryId);
-        }
         std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
         for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
             if(pQueryResult==(DrillClientQueryResult*)it->second){
@@ -1017,7 +1021,7 @@ void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
 
 // This COPIES the FieldMetadata definition for the record batch.  ColumnDefs held by this
 // class are used by the async callbacks.
-status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryResult* pQueryResult) {
+status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryData* pQueryData) {
     bool hasSchemaChanged=false;
     bool isFirstIter=false;
     boost::lock_guard<boost::mutex> schLock(this->m_schemaMutex);
@@ -1035,11 +1039,11 @@ status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryResult* pQue
         }
     }
     m_columnDefs->clear();
-    size_t numFields=pQueryResult->def().field_size();
+    size_t numFields=pQueryData->def().field_size();
     if (numFields > 0){
         for(size_t i=0; i<numFields; i++){
             Drill::FieldMetadata* fmd= new Drill::FieldMetadata;
-            fmd->set(pQueryResult->def().field(i));
+            fmd->set(pQueryData->def().field(i));
             this->m_columnDefs->push_back(fmd);
 
             //Look for changes in the vector and trigger a Schema change event if necessary.
@@ -1077,19 +1081,21 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
     DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;
     //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server.
     if(this->m_bCancel){
-        delete b;
+        if(b!=NULL) delete b;
         return QRY_FAILURE;
     }
     if (!err) {
         // signal the cond var
         {
-            #ifdef DEBUG
-            DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
-                << "Query result listener saved result to queue." << std::endl;
-            #endif
-            boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
-            this->m_recordBatches.push(b);
-            this->m_bHasData=true;
+            if(b!=NULL){
+#ifdef DEBUG
+                DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
+                    << "Query result listener saved result to queue." << std::endl;
+#endif
+                boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
+                this->m_recordBatches.push(b);
+                this->m_bHasData=true;
+            }
         }
         m_cv.notify_one();
     }else{
@@ -1100,11 +1106,11 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
 
 RecordBatch*  DrillClientQueryResult::peekNext(){
     RecordBatch* pRecordBatch=NULL;
+    boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
     //if no more data, return NULL;
     if(!m_bIsQueryPending) return NULL;
-    boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
     DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
-    while(!this->m_bHasData && !m_bHasError) {
+    while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) {
         this->m_cv.wait(cvLock);
     }
     // READ but not remove first element from queue
@@ -1114,6 +1120,7 @@ RecordBatch*  DrillClientQueryResult::peekNext(){
 
 RecordBatch*  DrillClientQueryResult::getNext() {
     RecordBatch* pRecordBatch=NULL;
+    boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
     //if no more data, return NULL;
     if(!m_bIsQueryPending){
         DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;
@@ -1123,9 +1130,8 @@ RecordBatch*  DrillClientQueryResult::getNext() {
         return NULL;
     }
 
-    boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
     DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
-    while(!this->m_bHasData && !m_bHasError){
+    while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){
         this->m_cv.wait(cvLock);
     }
     // remove first element from queue
@@ -1133,16 +1139,16 @@ RecordBatch*  DrillClientQueryResult::getNext() {
     this->m_recordBatches.pop();
     this->m_bHasData=!this->m_recordBatches.empty();
     // if vector is empty, set m_bHasDataPending to false;
-    m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_bIsLastChunk);
+    m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_queryState==exec::shared::QueryResult_QueryState_COMPLETED);
     return pRecordBatch;
 }
 
 // Blocks until data is available
 void DrillClientQueryResult::waitForData() {
+    boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
     //if no more data, return NULL;
     if(!m_bIsQueryPending) return;
-    boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
-    while(!this->m_bHasData && !m_bHasError) {
+    while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) {
         this->m_cv.wait(cvLock);
     }
 }
@@ -1164,9 +1170,9 @@ void DrillClientQueryResult::signalError(DrillClientError* pErr){
         }else{
             defaultQueryResultsListener(this, NULL, pErr);
         }
-        m_bIsQueryPending=false;
         {
             boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
+            m_bIsQueryPending=false;
             m_bHasData=false;
             m_bHasError=true;
         }
@@ -1176,6 +1182,24 @@ void DrillClientQueryResult::signalError(DrillClientError* pErr){
     return;
 }
 
+void DrillClientQueryResult::signalComplete(){
+    pfnQueryResultsListener pResultsListener=this->m_pResultsListener;
+    if(pResultsListener!=NULL){
+        pResultsListener(this, NULL, NULL);
+    }else{
+        defaultQueryResultsListener(this, NULL, NULL);
+    }
+    {
+        boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
+        m_bIsQueryPending=false;
+        m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_queryState==exec::shared::QueryResult_QueryState_COMPLETED);
+        m_bHasError=false;
+    }
+    //Signal the cv in case there is a client waiting for data already.
+    m_cv.notify_one();
+    return;
+}
+
 void DrillClientQueryResult::clearAndDestroy(){
     //free memory allocated for FieldMetadata objects saved in m_columnDefs;
     if(!m_columnDefs->empty()){
@@ -1207,7 +1231,7 @@ void DrillClientQueryResult::clearAndDestroy(){
     }
     if(m_pError!=NULL){
         delete m_pError; m_pError=NULL;
-}
+    }
 }
 
 char ZookeeperImpl::s_drillRoot[]="/drill/";

http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 33f81db..95fe922 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -127,14 +127,19 @@ class DrillClientQueryResult{
 
     void setQueryState(exec::shared::QueryResult_QueryState s){ m_queryState = s;}
     exec::shared::QueryResult_QueryState getQueryState(){ return m_queryState;}
+    void setIsQueryPending(bool isPending){
+        boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
+        m_bIsQueryPending=isPending;
+    }
 
     private:
-    status_t setupColumnDefs(exec::shared::QueryResult* pQueryResult);
+    status_t setupColumnDefs(exec::shared::QueryData* pQueryData);
     status_t defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err);
     // Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables.
     // Also used when a query is cancelled or when a query completed response is received.
     // Error object is now owned by the DrillClientQueryResult object.
     void signalError(DrillClientError* pErr);
+    void signalComplete();
     void clearAndDestroy();
 
 
@@ -212,7 +217,6 @@ class DrillClientImpl{
                 this->m_pWork = NULL;
             }
 
-            clearCancelledEntries();
             m_deadlineTimer.cancel();
             m_io_service.stop();
             boost::system::error_code ignorederr;
@@ -272,13 +276,16 @@ class DrillClientImpl{
                 InBoundRpcMessage& msg,
                 boost::system::error_code& error);
         status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg);
+        status_t processQueryData(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg);
         status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr);
         status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg );
+        DrillClientQueryResult* findQueryResult(exec::shared::QueryId& qid);
         status_t processQueryStatusResult( exec::shared::QueryResult* qr,
                 DrillClientQueryResult* pDrillClientQueryResult);
         void handleReadTimeout(const boost::system::error_code & err);
         void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ;
-        status_t validateMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError);
+        status_t validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valError);
+        status_t validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError);
         connectionStatus_t handleConnError(connectionStatus_t status, std::string msg);
         status_t handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult);
         status_t handleQryError(status_t status,
@@ -291,7 +298,6 @@ class DrillClientImpl{
                 DrillClientQueryResult* pQueryResult);
         void broadcastError(DrillClientError* pErr);
         void clearMapEntries(DrillClientQueryResult* pQueryResult);
-        void clearCancelledEntries();
         void sendAck(InBoundRpcMessage& msg, bool isOk);
         void sendCancel(exec::shared::QueryId* pQueryId);
 
@@ -335,11 +341,6 @@ class DrillClientImpl{
 
         // Map of query id to query result for currently executing queries
         std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults;
-        //
-        // State for every Query id whose queries have result data pending but which
-        // have been cancelled and whose resources have been released by the client application.
-        // The entry is cleared when the state changes to completed or failed.
-        std::set<exec::shared::QueryId*, compareQueryId> m_cancelledQueries;
 
 };
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/src/clientlib/recordBatch.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/recordBatch.cpp b/contrib/native/client/src/clientlib/recordBatch.cpp
index 44140b2..c6c033b 100644
--- a/contrib/native/client/src/clientlib/recordBatch.cpp
+++ b/contrib/native/client/src/clientlib/recordBatch.cpp
@@ -306,7 +306,7 @@ ret_t FieldBatch::loadNull(size_t nRecords){
     return RET_SUCCESS;
 }
     
-RecordBatch::RecordBatch(exec::shared::QueryResult* pResult, AllocatedBufferPtr r, ByteBuf_t b)
+RecordBatch::RecordBatch(exec::shared::QueryData* pResult, AllocatedBufferPtr r, ByteBuf_t b)
     :m_fieldDefs(new(std::vector<Drill::FieldMetadata*>)){
         m_pQueryResult=pResult;
         m_pRecordBatchDef=&pResult->def();
@@ -398,7 +398,7 @@ size_t RecordBatch::getNumFields(){
 }
 
 bool RecordBatch::isLastChunk(){
-    return m_pQueryResult->is_last_chunk(); 
+    return false;
 }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/src/include/drill/common.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index 6560692..72b9a98 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -34,7 +34,7 @@
 #include <vector>
 #include <boost/shared_ptr.hpp>
 
-#define DRILL_RPC_VERSION 3
+#define DRILL_RPC_VERSION 4
 
 #define LENGTH_PREFIX_MAX_LENGTH 5
 #define LEN_PREFIX_BUFLEN LENGTH_PREFIX_MAX_LENGTH

http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/src/include/drill/drillClient.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp
index 9289df3..c288c70 100644
--- a/contrib/native/client/src/include/drill/drillClient.hpp
+++ b/contrib/native/client/src/include/drill/drillClient.hpp
@@ -167,7 +167,9 @@ typedef void* QueryHandle_t;
  * Query Results listener callback. This function is called for every record batch after it has
  * been received and decoded. The listener function should return a status.
  * If the listener returns failure, the query will be canceled.
- *
+ * The listener is also called one last time when the query is completed or gets an error. In that
+ * case the RecordBatch Parameter is NULL. The DrillClientError parameter is NULL is there was no
+ * error oterwise it will have a valid DrillClientError object.
  * DrillClientQueryResult will hold a listener & listener contxt for the call back function
  */
 typedef status_t (*pfnQueryResultsListener)(QueryHandle_t ctx, RecordBatch* b, DrillClientError* err);

http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/src/include/drill/recordBatch.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/recordBatch.hpp b/contrib/native/client/src/include/drill/recordBatch.hpp
index 92a4c3ad..12cbad4 100644
--- a/contrib/native/client/src/include/drill/recordBatch.hpp
+++ b/contrib/native/client/src/include/drill/recordBatch.hpp
@@ -56,6 +56,7 @@ namespace exec{
         class SerializedField;
         class RecordBatchDef;
         class QueryResult;
+        class QueryData;
     };
 };
 
@@ -863,7 +864,7 @@ class DECLSPEC_DRILL_CLIENT RecordBatch{
         //m_allocatedBuffer is the memory block allocated to hold the incoming RPC message. Record Batches operate on
         //slices of the allocated buffer. The first slice (the first Field Batch), begins at m_buffer. Data in the
         //allocated buffer before m_buffer is mostly the RPC header, and the QueryResult object.
-        RecordBatch(exec::shared::QueryResult* pResult, AllocatedBufferPtr r, ByteBuf_t b);
+        RecordBatch(exec::shared::QueryData* pResult, AllocatedBufferPtr r, ByteBuf_t b);
 
         ~RecordBatch();
 
@@ -876,7 +877,7 @@ class DECLSPEC_DRILL_CLIENT RecordBatch{
         size_t getNumRecords(){ return m_numRecords;}
         std::vector<FieldBatch*>& getFields(){ return m_fields;}
         size_t getNumFields();
-        bool isLastChunk();
+        DEPRECATED bool isLastChunk();
 
         boost::shared_ptr<std::vector<Drill::FieldMetadata*> > getColumnDefs(){ return m_fieldDefs;}
 
@@ -902,10 +903,10 @@ class DECLSPEC_DRILL_CLIENT RecordBatch{
         bool hasSchemaChanged(){ return m_bHasSchemaChanged;}
 
         #ifdef DEBUG
-        const exec::shared::QueryResult* getQueryResult(){ return this->m_pQueryResult;}
+        const exec::shared::QueryData* getQueryResult(){ return this->m_pQueryResult;}
         #endif
     private:
-        const exec::shared::QueryResult* m_pQueryResult;
+        const exec::shared::QueryData* m_pQueryResult;
         const exec::shared::RecordBatchDef* m_pRecordBatchDef;
         AllocatedBufferPtr m_allocatedBuffer;
         ByteBuf_t m_buffer;

http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/src/protobuf/BitData.pb.cc
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/protobuf/BitData.pb.cc b/contrib/native/client/src/protobuf/BitData.pb.cc
index ef4f99d..57bceff 100644
--- a/contrib/native/client/src/protobuf/BitData.pb.cc
+++ b/contrib/native/client/src/protobuf/BitData.pb.cc
@@ -43,10 +43,9 @@ void protobuf_AssignDesc_BitData_2eproto() {
       "BitData.proto");
   GOOGLE_CHECK(file != NULL);
   BitClientHandshake_descriptor_ = file->message_type(0);
-  static const int BitClientHandshake_offsets_[3] = {
+  static const int BitClientHandshake_offsets_[2] = {
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(BitClientHandshake, rpc_version_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(BitClientHandshake, channel_),
-    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(BitClientHandshake, handle_),
   };
   BitClientHandshake_reflection_ =
     new ::google::protobuf::internal::GeneratedMessageReflection(
@@ -75,8 +74,10 @@ void protobuf_AssignDesc_BitData_2eproto() {
       ::google::protobuf::MessageFactory::generated_factory(),
       sizeof(BitServerHandshake));
   FragmentRecordBatch_descriptor_ = file->message_type(2);
-  static const int FragmentRecordBatch_offsets_[6] = {
-    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(FragmentRecordBatch, handle_),
+  static const int FragmentRecordBatch_offsets_[8] = {
+    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(FragmentRecordBatch, query_id_),
+    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(FragmentRecordBatch, receiving_major_fragment_id_),
+    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(FragmentRecordBatch, receiving_minor_fragment_id_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(FragmentRecordBatch, sending_major_fragment_id_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(FragmentRecordBatch, sending_minor_fragment_id_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(FragmentRecordBatch, def_),
@@ -138,20 +139,21 @@ void protobuf_AddDesc_BitData_2eproto() {
   ::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
     "\n\rBitData.proto\022\rexec.bit.data\032\025Executio"
     "nProtos.proto\032\022Coordination.proto\032\023UserB"
-    "itShared.proto\"\207\001\n\022BitClientHandshake\022\023\n"
-    "\013rpc_version\030\001 \001(\005\0222\n\007channel\030\002 \001(\0162\027.ex"
-    "ec.shared.RpcChannel:\010BIT_DATA\022(\n\006handle"
-    "\030\003 \001(\0132\030.exec.bit.FragmentHandle\")\n\022BitS"
-    "erverHandshake\022\023\n\013rpc_version\030\001 \001(\005\"\342\001\n\023"
-    "FragmentRecordBatch\022(\n\006handle\030\001 \001(\0132\030.ex"
-    "ec.bit.FragmentHandle\022!\n\031sending_major_f"
-    "ragment_id\030\002 \001(\005\022!\n\031sending_minor_fragme"
-    "nt_id\030\003 \001(\005\022(\n\003def\030\004 \001(\0132\033.exec.shared.R"
-    "ecordBatchDef\022\023\n\013isLastBatch\030\005 \001(\010\022\034\n\ris"
-    "OutOfMemory\030\006 \001(\010:\005false*D\n\007RpcType\022\r\n\tH"
-    "ANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_"
-    "RECORD_BATCH\020\003B(\n\033org.apache.drill.exec."
-    "protoB\007BitDataH\001", 616);
+    "itShared.proto\"]\n\022BitClientHandshake\022\023\n\013"
+    "rpc_version\030\001 \001(\005\0222\n\007channel\030\002 \001(\0162\027.exe"
+    "c.shared.RpcChannel:\010BIT_DATA\")\n\022BitServ"
+    "erHandshake\022\023\n\013rpc_version\030\001 \001(\005\"\252\002\n\023Fra"
+    "gmentRecordBatch\022&\n\010query_id\030\001 \001(\0132\024.exe"
+    "c.shared.QueryId\022#\n\033receiving_major_frag"
+    "ment_id\030\002 \001(\005\022#\n\033receiving_minor_fragmen"
+    "t_id\030\003 \003(\005\022!\n\031sending_major_fragment_id\030"
+    "\004 \001(\005\022!\n\031sending_minor_fragment_id\030\005 \001(\005"
+    "\022(\n\003def\030\006 \001(\0132\033.exec.shared.RecordBatchD"
+    "ef\022\023\n\013isLastBatch\030\007 \001(\010\022\034\n\risOutOfMemory"
+    "\030\010 \001(\010:\005false*D\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022"
+    "\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATC"
+    "H\020\003B(\n\033org.apache.drill.exec.protoB\007BitD"
+    "ataH\001", 645);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "BitData.proto", &protobuf_RegisterTypes);
   BitClientHandshake::default_instance_ = new BitClientHandshake();
@@ -191,7 +193,6 @@ bool RpcType_IsValid(int value) {
 #ifndef _MSC_VER
 const int BitClientHandshake::kRpcVersionFieldNumber;
 const int BitClientHandshake::kChannelFieldNumber;
-const int BitClientHandshake::kHandleFieldNumber;
 #endif  // !_MSC_VER
 
 BitClientHandshake::BitClientHandshake()
@@ -200,7 +201,6 @@ BitClientHandshake::BitClientHandshake()
 }
 
 void BitClientHandshake::InitAsDefaultInstance() {
-  handle_ = const_cast< ::exec::bit::FragmentHandle*>(&::exec::bit::FragmentHandle::default_instance());
 }
 
 BitClientHandshake::BitClientHandshake(const BitClientHandshake& from)
@@ -213,7 +213,6 @@ void BitClientHandshake::SharedCtor() {
   _cached_size_ = 0;
   rpc_version_ = 0;
   channel_ = 1;
-  handle_ = NULL;
   ::memset(_has_bits_, 0, sizeof(_has_bits_));
 }
 
@@ -223,7 +222,6 @@ BitClientHandshake::~BitClientHandshake() {
 
 void BitClientHandshake::SharedDtor() {
   if (this != default_instance_) {
-    delete handle_;
   }
 }
 
@@ -252,9 +250,6 @@ void BitClientHandshake::Clear() {
   if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) {
     rpc_version_ = 0;
     channel_ = 1;
-    if (has_handle()) {
-      if (handle_ != NULL) handle_->::exec::bit::FragmentHandle::Clear();
-    }
   }
   ::memset(_has_bits_, 0, sizeof(_has_bits_));
   mutable_unknown_fields()->Clear();
@@ -298,20 +293,6 @@ bool BitClientHandshake::MergePartialFromCodedStream(
         } else {
           goto handle_uninterpreted;
         }
-        if (input->ExpectTag(26)) goto parse_handle;
-        break;
-      }
-
-      // optional .exec.bit.FragmentHandle handle = 3;
-      case 3: {
-        if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
-            ::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) {
-         parse_handle:
-          DO_(::google::protobuf::internal::WireFormatLite::ReadMessageNoVirtual(
-               input, mutable_handle()));
-        } else {
-          goto handle_uninterpreted;
-        }
         if (input->ExpectAtEnd()) return true;
         break;
       }
@@ -345,12 +326,6 @@ void BitClientHandshake::SerializeWithCachedSizes(
       2, this->channel(), output);
   }
 
-  // optional .exec.bit.FragmentHandle handle = 3;
-  if (has_handle()) {
-    ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray(
-      3, this->handle(), output);
-  }
-
   if (!unknown_fields().empty()) {
     ::google::protobuf::internal::WireFormat::SerializeUnknownFields(
         unknown_fields(), output);
@@ -370,13 +345,6 @@ void BitClientHandshake::SerializeWithCachedSizes(
       2, this->channel(), target);
   }
 
-  // optional .exec.bit.FragmentHandle handle = 3;
-  if (has_handle()) {
-    target = ::google::protobuf::internal::WireFormatLite::
-      WriteMessageNoVirtualToArray(
-        3, this->handle(), target);
-  }
-
   if (!unknown_fields().empty()) {
     target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray(
         unknown_fields(), target);
@@ -401,13 +369,6 @@ int BitClientHandshake::ByteSize() const {
         ::google::protobuf::internal::WireFormatLite::EnumSize(this->channel());
     }
 
-    // optional .exec.bit.FragmentHandle handle = 3;
-    if (has_handle()) {
-      total_size += 1 +
-        ::google::protobuf::internal::WireFormatLite::MessageSizeNoVirtual(
-          this->handle());
-    }
-
   }
   if (!unknown_fields().empty()) {
     total_size +=
@@ -441,9 +402,6 @@ void BitClientHandshake::MergeFrom(const BitClientHandshake& from) {
     if (from.has_channel()) {
       set_channel(from.channel());
     }
-    if (from.has_handle()) {
-      mutable_handle()->::exec::bit::FragmentHandle::MergeFrom(from.handle());
-    }
   }
   mutable_unknown_fields()->MergeFrom(from.unknown_fields());
 }
@@ -469,7 +427,6 @@ void BitClientHandshake::Swap(BitClientHandshake* other) {
   if (other != this) {
     std::swap(rpc_version_, other->rpc_version_);
     std::swap(channel_, other->channel_);
-    std::swap(handle_, other->handle_);
     std::swap(_has_bits_[0], other->_has_bits_[0]);
     _unknown_fields_.Swap(&other->_unknown_fields_);
     std::swap(_cached_size_, other->_cached_size_);
@@ -696,7 +653,9 @@ void BitServerHandshake::Swap(BitServerHandshake* other) {
 // ===================================================================
 
 #ifndef _MSC_VER
-const int FragmentRecordBatch::kHandleFieldNumber;
+const int FragmentRecordBatch::kQueryIdFieldNumber;
+const int FragmentRecordBatch::kReceivingMajorFragmentIdFieldNumber;
+const int FragmentRecordBatch::kReceivingMinorFragmentIdFieldNumber;
 const int FragmentRecordBatch::kSendingMajorFragmentIdFieldNumber;
 const int FragmentRecordBatch::kSendingMinorFragmentIdFieldNumber;
 const int FragmentRecordBatch::kDefFieldNumber;
@@ -710,7 +669,7 @@ FragmentRecordBatch::FragmentRecordBatch()
 }
 
 void FragmentRecordBatch::InitAsDefaultInstance() {
-  handle_ = const_cast< ::exec::bit::FragmentHandle*>(&::exec::bit::FragmentHandle::default_instance());
+  query_id_ = const_cast< ::exec::shared::QueryId*>(&::exec::shared::QueryId::default_instance());
   def_ = const_cast< ::exec::shared::RecordBatchDef*>(&::exec::shared::RecordBatchDef::default_instance());
 }
 
@@ -722,7 +681,8 @@ FragmentRecordBatch::FragmentRecordBatch(const FragmentRecordBatch& from)
 
 void FragmentRecordBatch::SharedCtor() {
   _cached_size_ = 0;
-  handle_ = NULL;
+  query_id_ = NULL;
+  receiving_major_fragment_id_ = 0;
   sending_major_fragment_id_ = 0;
   sending_minor_fragment_id_ = 0;
   def_ = NULL;
@@ -737,7 +697,7 @@ FragmentRecordBatch::~FragmentRecordBatch() {
 
 void FragmentRecordBatch::SharedDtor() {
   if (this != default_instance_) {
-    delete handle_;
+    delete query_id_;
     delete def_;
   }
 }
@@ -765,9 +725,10 @@ FragmentRecordBatch* FragmentRecordBatch::New() const {
 
 void FragmentRecordBatch::Clear() {
   if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) {
-    if (has_handle()) {
-      if (handle_ != NULL) handle_->::exec::bit::FragmentHandle::Clear();
+    if (has_query_id()) {
+      if (query_id_ != NULL) query_id_->::exec::shared::QueryId::Clear();
     }
+    receiving_major_fragment_id_ = 0;
     sending_major_fragment_id_ = 0;
     sending_minor_fragment_id_ = 0;
     if (has_def()) {
@@ -776,6 +737,7 @@ void FragmentRecordBatch::Clear() {
     islastbatch_ = false;
     isoutofmemory_ = false;
   }
+  receiving_minor_fragment_id_.Clear();
   ::memset(_has_bits_, 0, sizeof(_has_bits_));
   mutable_unknown_fields()->Clear();
 }
@@ -786,23 +748,61 @@ bool FragmentRecordBatch::MergePartialFromCodedStream(
   ::google::protobuf::uint32 tag;
   while ((tag = input->ReadTag()) != 0) {
     switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) {
-      // optional .exec.bit.FragmentHandle handle = 1;
+      // optional .exec.shared.QueryId query_id = 1;
       case 1: {
         if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
             ::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) {
           DO_(::google::protobuf::internal::WireFormatLite::ReadMessageNoVirtual(
-               input, mutable_handle()));
+               input, mutable_query_id()));
         } else {
           goto handle_uninterpreted;
         }
-        if (input->ExpectTag(16)) goto parse_sending_major_fragment_id;
+        if (input->ExpectTag(16)) goto parse_receiving_major_fragment_id;
         break;
       }
 
-      // optional int32 sending_major_fragment_id = 2;
+      // optional int32 receiving_major_fragment_id = 2;
       case 2: {
         if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
             ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
+         parse_receiving_major_fragment_id:
+          DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
+                   ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>(
+                 input, &receiving_major_fragment_id_)));
+          set_has_receiving_major_fragment_id();
+        } else {
+          goto handle_uninterpreted;
+        }
+        if (input->ExpectTag(24)) goto parse_receiving_minor_fragment_id;
+        break;
+      }
+
+      // repeated int32 receiving_minor_fragment_id = 3;
+      case 3: {
+        if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
+            ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
+         parse_receiving_minor_fragment_id:
+          DO_((::google::protobuf::internal::WireFormatLite::ReadRepeatedPrimitive<
+                   ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>(
+                 1, 24, input, this->mutable_receiving_minor_fragment_id())));
+        } else if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag)
+                   == ::google::protobuf::internal::WireFormatLite::
+                      WIRETYPE_LENGTH_DELIMITED) {
+          DO_((::google::protobuf::internal::WireFormatLite::ReadPackedPrimitiveNoInline<
+                   ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>(
+                 input, this->mutable_receiving_minor_fragment_id())));
+        } else {
+          goto handle_uninterpreted;
+        }
+        if (input->ExpectTag(24)) goto parse_receiving_minor_fragment_id;
+        if (input->ExpectTag(32)) goto parse_sending_major_fragment_id;
+        break;
+      }
+
+      // optional int32 sending_major_fragment_id = 4;
+      case 4: {
+        if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
+            ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
          parse_sending_major_fragment_id:
           DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
                    ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>(
@@ -811,12 +811,12 @@ bool FragmentRecordBatch::MergePartialFromCodedStream(
         } else {
           goto handle_uninterpreted;
         }
-        if (input->ExpectTag(24)) goto parse_sending_minor_fragment_id;
+        if (input->ExpectTag(40)) goto parse_sending_minor_fragment_id;
         break;
       }
 
-      // optional int32 sending_minor_fragment_id = 3;
-      case 3: {
+      // optional int32 sending_minor_fragment_id = 5;
+      case 5: {
         if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
             ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
          parse_sending_minor_fragment_id:
@@ -827,12 +827,12 @@ bool FragmentRecordBatch::MergePartialFromCodedStream(
         } else {
           goto handle_uninterpreted;
         }
-        if (input->ExpectTag(34)) goto parse_def;
+        if (input->ExpectTag(50)) goto parse_def;
         break;
       }
 
-      // optional .exec.shared.RecordBatchDef def = 4;
-      case 4: {
+      // optional .exec.shared.RecordBatchDef def = 6;
+      case 6: {
         if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
             ::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) {
          parse_def:
@@ -841,12 +841,12 @@ bool FragmentRecordBatch::MergePartialFromCodedStream(
         } else {
           goto handle_uninterpreted;
         }
-        if (input->ExpectTag(40)) goto parse_isLastBatch;
+        if (input->ExpectTag(56)) goto parse_isLastBatch;
         break;
       }
 
-      // optional bool isLastBatch = 5;
-      case 5: {
+      // optional bool isLastBatch = 7;
+      case 7: {
         if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
             ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
          parse_isLastBatch:
@@ -857,12 +857,12 @@ bool FragmentRecordBatch::MergePartialFromCodedStream(
         } else {
           goto handle_uninterpreted;
         }
-        if (input->ExpectTag(48)) goto parse_isOutOfMemory;
+        if (input->ExpectTag(64)) goto parse_isOutOfMemory;
         break;
       }
 
-      // optional bool isOutOfMemory = 6 [default = false];
-      case 6: {
+      // optional bool isOutOfMemory = 8 [default = false];
+      case 8: {
         if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
             ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
          parse_isOutOfMemory:
@@ -895,36 +895,47 @@ bool FragmentRecordBatch::MergePartialFromCodedStream(
 
 void FragmentRecordBatch::SerializeWithCachedSizes(
     ::google::protobuf::io::CodedOutputStream* output) const {
-  // optional .exec.bit.FragmentHandle handle = 1;
-  if (has_handle()) {
+  // optional .exec.shared.QueryId query_id = 1;
+  if (has_query_id()) {
     ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray(
-      1, this->handle(), output);
+      1, this->query_id(), output);
   }
 
-  // optional int32 sending_major_fragment_id = 2;
+  // optional int32 receiving_major_fragment_id = 2;
+  if (has_receiving_major_fragment_id()) {
+    ::google::protobuf::internal::WireFormatLite::WriteInt32(2, this->receiving_major_fragment_id(), output);
+  }
+
+  // repeated int32 receiving_minor_fragment_id = 3;
+  for (int i = 0; i < this->receiving_minor_fragment_id_size(); i++) {
+    ::google::protobuf::internal::WireFormatLite::WriteInt32(
+      3, this->receiving_minor_fragment_id(i), output);
+  }
+
+  // optional int32 sending_major_fragment_id = 4;
   if (has_sending_major_fragment_id()) {
-    ::google::protobuf::internal::WireFormatLite::WriteInt32(2, this->sending_major_fragment_id(), output);
+    ::google::protobuf::internal::WireFormatLite::WriteInt32(4, this->sending_major_fragment_id(), output);
   }
 
-  // optional int32 sending_minor_fragment_id = 3;
+  // optional int32 sending_minor_fragment_id = 5;
   if (has_sending_minor_fragment_id()) {
-    ::google::protobuf::internal::WireFormatLite::WriteInt32(3, this->sending_minor_fragment_id(), output);
+    ::google::protobuf::internal::WireFormatLite::WriteInt32(5, this->sending_minor_fragment_id(), output);
   }
 
-  // optional .exec.shared.RecordBatchDef def = 4;
+  // optional .exec.shared.RecordBatchDef def = 6;
   if (has_def()) {
     ::google::protobuf::internal::WireFormatLite::WriteMessageMaybeToArray(
-      4, this->def(), output);
+      6, this->def(), output);
   }
 
-  // optional bool isLastBatch = 5;
+  // optional bool isLastBatch = 7;
   if (has_islastbatch()) {
-    ::google::protobuf::internal::WireFormatLite::WriteBool(5, this->islastbatch(), output);
+    ::google::protobuf::internal::WireFormatLite::WriteBool(7, this->islastbatch(), output);
   }
 
-  // optional bool isOutOfMemory = 6 [default = false];
+  // optional bool isOutOfMemory = 8 [default = false];
   if (has_isoutofmemory()) {
-    ::google::protobuf::internal::WireFormatLite::WriteBool(6, this->isoutofmemory(), output);
+    ::google::protobuf::internal::WireFormatLite::WriteBool(8, this->isoutofmemory(), output);
   }
 
   if (!unknown_fields().empty()) {
@@ -935,38 +946,49 @@ void FragmentRecordBatch::SerializeWithCachedSizes(
 
 ::google::protobuf::uint8* FragmentRecordBatch::SerializeWithCachedSizesToArray(
     ::google::protobuf::uint8* target) const {
-  // optional .exec.bit.FragmentHandle handle = 1;
-  if (has_handle()) {
+  // optional .exec.shared.QueryId query_id = 1;
+  if (has_query_id()) {
     target = ::google::protobuf::internal::WireFormatLite::
       WriteMessageNoVirtualToArray(
-        1, this->handle(), target);
+        1, this->query_id(), target);
+  }
+
+  // optional int32 receiving_major_fragment_id = 2;
+  if (has_receiving_major_fragment_id()) {
+    target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(2, this->receiving_major_fragment_id(), target);
+  }
+
+  // repeated int32 receiving_minor_fragment_id = 3;
+  for (int i = 0; i < this->receiving_minor_fragment_id_size(); i++) {
+    target = ::google::protobuf::internal::WireFormatLite::
+      WriteInt32ToArray(3, this->receiving_minor_fragment_id(i), target);
   }
 
-  // optional int32 sending_major_fragment_id = 2;
+  // optional int32 sending_major_fragment_id = 4;
   if (has_sending_major_fragment_id()) {
-    target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(2, this->sending_major_fragment_id(), target);
+    target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(4, this->sending_major_fragment_id(), target);
   }
 
-  // optional int32 sending_minor_fragment_id = 3;
+  // optional int32 sending_minor_fragment_id = 5;
   if (has_sending_minor_fragment_id()) {
-    target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(3, this->sending_minor_fragment_id(), target);
+    target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(5, this->sending_minor_fragment_id(), target);
   }
 
-  // optional .exec.shared.RecordBatchDef def = 4;
+  // optional .exec.shared.RecordBatchDef def = 6;
   if (has_def()) {
     target = ::google::protobuf::internal::WireFormatLite::
       WriteMessageNoVirtualToArray(
-        4, this->def(), target);
+        6, this->def(), target);
   }
 
-  // optional bool isLastBatch = 5;
+  // optional bool isLastBatch = 7;
   if (has_islastbatch()) {
-    target = ::google::protobuf::internal::WireFormatLite::WriteBoolToArray(5, this->islastbatch(), target);
+    target = ::google::protobuf::internal::WireFormatLite::WriteBoolToArray(7, this->islastbatch(), target);
   }
 
-  // optional bool isOutOfMemory = 6 [default = false];
+  // optional bool isOutOfMemory = 8 [default = false];
   if (has_isoutofmemory()) {
-    target = ::google::protobuf::internal::WireFormatLite::WriteBoolToArray(6, this->isoutofmemory(), target);
+    target = ::google::protobuf::internal::WireFormatLite::WriteBoolToArray(8, this->isoutofmemory(), target);
   }
 
   if (!unknown_fields().empty()) {
@@ -980,45 +1002,62 @@ int FragmentRecordBatch::ByteSize() const {
   int total_size = 0;
 
   if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) {
-    // optional .exec.bit.FragmentHandle handle = 1;
-    if (has_handle()) {
+    // optional .exec.shared.QueryId query_id = 1;
+    if (has_query_id()) {
       total_size += 1 +
         ::google::protobuf::internal::WireFormatLite::MessageSizeNoVirtual(
-          this->handle());
+          this->query_id());
     }
 
-    // optional int32 sending_major_fragment_id = 2;
+    // optional int32 receiving_major_fragment_id = 2;
+    if (has_receiving_major_fragment_id()) {
+      total_size += 1 +
+        ::google::protobuf::internal::WireFormatLite::Int32Size(
+          this->receiving_major_fragment_id());
+    }
+
+    // optional int32 sending_major_fragment_id = 4;
     if (has_sending_major_fragment_id()) {
       total_size += 1 +
         ::google::protobuf::internal::WireFormatLite::Int32Size(
           this->sending_major_fragment_id());
     }
 
-    // optional int32 sending_minor_fragment_id = 3;
+    // optional int32 sending_minor_fragment_id = 5;
     if (has_sending_minor_fragment_id()) {
       total_size += 1 +
         ::google::protobuf::internal::WireFormatLite::Int32Size(
           this->sending_minor_fragment_id());
     }
 
-    // optional .exec.shared.RecordBatchDef def = 4;
+    // optional .exec.shared.RecordBatchDef def = 6;
     if (has_def()) {
       total_size += 1 +
         ::google::protobuf::internal::WireFormatLite::MessageSizeNoVirtual(
           this->def());
     }
 
-    // optional bool isLastBatch = 5;
+    // optional bool isLastBatch = 7;
     if (has_islastbatch()) {
       total_size += 1 + 1;
     }
 
-    // optional bool isOutOfMemory = 6 [default = false];
+    // optional bool isOutOfMemory = 8 [default = false];
     if (has_isoutofmemory()) {
       total_size += 1 + 1;
     }
 
   }
+  // repeated int32 receiving_minor_fragment_id = 3;
+  {
+    int data_size = 0;
+    for (int i = 0; i < this->receiving_minor_fragment_id_size(); i++) {
+      data_size += ::google::protobuf::internal::WireFormatLite::
+        Int32Size(this->receiving_minor_fragment_id(i));
+    }
+    total_size += 1 * this->receiving_minor_fragment_id_size() + data_size;
+  }
+
   if (!unknown_fields().empty()) {
     total_size +=
       ::google::protobuf::internal::WireFormat::ComputeUnknownFieldsSize(
@@ -1044,9 +1083,13 @@ void FragmentRecordBatch::MergeFrom(const ::google::protobuf::Message& from) {
 
 void FragmentRecordBatch::MergeFrom(const FragmentRecordBatch& from) {
   GOOGLE_CHECK_NE(&from, this);
+  receiving_minor_fragment_id_.MergeFrom(from.receiving_minor_fragment_id_);
   if (from._has_bits_[0 / 32] & (0xffu << (0 % 32))) {
-    if (from.has_handle()) {
-      mutable_handle()->::exec::bit::FragmentHandle::MergeFrom(from.handle());
+    if (from.has_query_id()) {
+      mutable_query_id()->::exec::shared::QueryId::MergeFrom(from.query_id());
+    }
+    if (from.has_receiving_major_fragment_id()) {
+      set_receiving_major_fragment_id(from.receiving_major_fragment_id());
     }
     if (from.has_sending_major_fragment_id()) {
       set_sending_major_fragment_id(from.sending_major_fragment_id());
@@ -1086,7 +1129,9 @@ bool FragmentRecordBatch::IsInitialized() const {
 
 void FragmentRecordBatch::Swap(FragmentRecordBatch* other) {
   if (other != this) {
-    std::swap(handle_, other->handle_);
+    std::swap(query_id_, other->query_id_);
+    std::swap(receiving_major_fragment_id_, other->receiving_major_fragment_id_);
+    receiving_minor_fragment_id_.Swap(&other->receiving_minor_fragment_id_);
     std::swap(sending_major_fragment_id_, other->sending_major_fragment_id_);
     std::swap(sending_minor_fragment_id_, other->sending_minor_fragment_id_);
     std::swap(def_, other->def_);

http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/src/protobuf/BitData.pb.h
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/protobuf/BitData.pb.h b/contrib/native/client/src/protobuf/BitData.pb.h
index f1f9353..806d7f7 100644
--- a/contrib/native/client/src/protobuf/BitData.pb.h
+++ b/contrib/native/client/src/protobuf/BitData.pb.h
@@ -134,32 +134,20 @@ class BitClientHandshake : public ::google::protobuf::Message {
   inline ::exec::shared::RpcChannel channel() const;
   inline void set_channel(::exec::shared::RpcChannel value);
 
-  // optional .exec.bit.FragmentHandle handle = 3;
-  inline bool has_handle() const;
-  inline void clear_handle();
-  static const int kHandleFieldNumber = 3;
-  inline const ::exec::bit::FragmentHandle& handle() const;
-  inline ::exec::bit::FragmentHandle* mutable_handle();
-  inline ::exec::bit::FragmentHandle* release_handle();
-  inline void set_allocated_handle(::exec::bit::FragmentHandle* handle);
-
   // @@protoc_insertion_point(class_scope:exec.bit.data.BitClientHandshake)
  private:
   inline void set_has_rpc_version();
   inline void clear_has_rpc_version();
   inline void set_has_channel();
   inline void clear_has_channel();
-  inline void set_has_handle();
-  inline void clear_has_handle();
 
   ::google::protobuf::UnknownFieldSet _unknown_fields_;
 
   ::google::protobuf::int32 rpc_version_;
   int channel_;
-  ::exec::bit::FragmentHandle* handle_;
 
   mutable int _cached_size_;
-  ::google::protobuf::uint32 _has_bits_[(3 + 31) / 32];
+  ::google::protobuf::uint32 _has_bits_[(2 + 31) / 32];
 
   friend void  protobuf_AddDesc_BitData_2eproto();
   friend void protobuf_AssignDesc_BitData_2eproto();
@@ -306,56 +294,77 @@ class FragmentRecordBatch : public ::google::protobuf::Message {
 
   // accessors -------------------------------------------------------
 
-  // optional .exec.bit.FragmentHandle handle = 1;
-  inline bool has_handle() const;
-  inline void clear_handle();
-  static const int kHandleFieldNumber = 1;
-  inline const ::exec::bit::FragmentHandle& handle() const;
-  inline ::exec::bit::FragmentHandle* mutable_handle();
-  inline ::exec::bit::FragmentHandle* release_handle();
-  inline void set_allocated_handle(::exec::bit::FragmentHandle* handle);
-
-  // optional int32 sending_major_fragment_id = 2;
+  // optional .exec.shared.QueryId query_id = 1;
+  inline bool has_query_id() const;
+  inline void clear_query_id();
+  static const int kQueryIdFieldNumber = 1;
+  inline const ::exec::shared::QueryId& query_id() const;
+  inline ::exec::shared::QueryId* mutable_query_id();
+  inline ::exec::shared::QueryId* release_query_id();
+  inline void set_allocated_query_id(::exec::shared::QueryId* query_id);
+
+  // optional int32 receiving_major_fragment_id = 2;
+  inline bool has_receiving_major_fragment_id() const;
+  inline void clear_receiving_major_fragment_id();
+  static const int kReceivingMajorFragmentIdFieldNumber = 2;
+  inline ::google::protobuf::int32 receiving_major_fragment_id() const;
+  inline void set_receiving_major_fragment_id(::google::protobuf::int32 value);
+
+  // repeated int32 receiving_minor_fragment_id = 3;
+  inline int receiving_minor_fragment_id_size() const;
+  inline void clear_receiving_minor_fragment_id();
+  static const int kReceivingMinorFragmentIdFieldNumber = 3;
+  inline ::google::protobuf::int32 receiving_minor_fragment_id(int index) const;
+  inline void set_receiving_minor_fragment_id(int index, ::google::protobuf::int32 value);
+  inline void add_receiving_minor_fragment_id(::google::protobuf::int32 value);
+  inline const ::google::protobuf::RepeatedField< ::google::protobuf::int32 >&
+      receiving_minor_fragment_id() const;
+  inline ::google::protobuf::RepeatedField< ::google::protobuf::int32 >*
+      mutable_receiving_minor_fragment_id();
+
+  // optional int32 sending_major_fragment_id = 4;
   inline bool has_sending_major_fragment_id() const;
   inline void clear_sending_major_fragment_id();
-  static const int kSendingMajorFragmentIdFieldNumber = 2;
+  static const int kSendingMajorFragmentIdFieldNumber = 4;
   inline ::google::protobuf::int32 sending_major_fragment_id() const;
   inline void set_sending_major_fragment_id(::google::protobuf::int32 value);
 
-  // optional int32 sending_minor_fragment_id = 3;
+  // optional int32 sending_minor_fragment_id = 5;
   inline bool has_sending_minor_fragment_id() const;
   inline void clear_sending_minor_fragment_id();
-  static const int kSendingMinorFragmentIdFieldNumber = 3;
+  static const int kSendingMinorFragmentIdFieldNumber = 5;
   inline ::google::protobuf::int32 sending_minor_fragment_id() const;
   inline void set_sending_minor_fragment_id(::google::protobuf::int32 value);
 
-  // optional .exec.shared.RecordBatchDef def = 4;
+  // optional .exec.shared.RecordBatchDef def = 6;
   inline bool has_def() const;
   inline void clear_def();
-  static const int kDefFieldNumber = 4;
+  static const int kDefFieldNumber = 6;
   inline const ::exec::shared::RecordBatchDef& def() const;
   inline ::exec::shared::RecordBatchDef* mutable_def();
   inline ::exec::shared::RecordBatchDef* release_def();
   inline void set_allocated_def(::exec::shared::RecordBatchDef* def);
 
-  // optional bool isLastBatch = 5;
+  // optional bool isLastBatch = 7;
   inline bool has_islastbatch() const;
   inline void clear_islastbatch();
-  static const int kIsLastBatchFieldNumber = 5;
+  static const int kIsLastBatchFieldNumber = 7;
   inline bool islastbatch() const;
   inline void set_islastbatch(bool value);
 
-  // optional bool isOutOfMemory = 6 [default = false];
+  // optional bool isOutOfMemory = 8 [default = false];
   inline bool has_isoutofmemory() const;
   inline void clear_isoutofmemory();
-  static const int kIsOutOfMemoryFieldNumber = 6;
+  static const int kIsOutOfMemoryFieldNumber = 8;
   inline bool isoutofmemory() const;
   inline void set_isoutofmemory(bool value);
 
   // @@protoc_insertion_point(class_scope:exec.bit.data.FragmentRecordBatch)
  private:
-  inline void set_has_handle();
-  inline void clear_has_handle();
+  inline void set_has_query_id();
+  inline void clear_has_query_id();
+  inline void set_has_receiving_major_fragment_id();
+  inline void clear_has_receiving_major_fragment_id();
   inline void set_has_sending_major_fragment_id();
   inline void clear_has_sending_major_fragment_id();
   inline void set_has_sending_minor_fragment_id();
@@ -369,15 +378,17 @@ class FragmentRecordBatch : public ::google::protobuf::Message {
 
   ::google::protobuf::UnknownFieldSet _unknown_fields_;
 
-  ::exec::bit::FragmentHandle* handle_;
+  ::exec::shared::QueryId* query_id_;
+  ::google::protobuf::RepeatedField< ::google::protobuf::int32 > receiving_minor_fragment_id_;
+  ::google::protobuf::int32 receiving_major_fragment_id_;
   ::google::protobuf::int32 sending_major_fragment_id_;
-  ::google::protobuf::int32 sending_minor_fragment_id_;
   ::exec::shared::RecordBatchDef* def_;
+  ::google::protobuf::int32 sending_minor_fragment_id_;
   bool islastbatch_;
   bool isoutofmemory_;
 
   mutable int _cached_size_;
-  ::google::protobuf::uint32 _has_bits_[(6 + 31) / 32];
+  ::google::protobuf::uint32 _has_bits_[(8 + 31) / 32];
 
   friend void  protobuf_AddDesc_BitData_2eproto();
   friend void protobuf_AssignDesc_BitData_2eproto();
@@ -438,44 +449,6 @@ inline void BitClientHandshake::set_channel(::exec::shared::RpcChannel value) {
   channel_ = value;
 }
 
-// optional .exec.bit.FragmentHandle handle = 3;
-inline bool BitClientHandshake::has_handle() const {
-  return (_has_bits_[0] & 0x00000004u) != 0;
-}
-inline void BitClientHandshake::set_has_handle() {
-  _has_bits_[0] |= 0x00000004u;
-}
-inline void BitClientHandshake::clear_has_handle() {
-  _has_bits_[0] &= ~0x00000004u;
-}
-inline void BitClientHandshake::clear_handle() {
-  if (handle_ != NULL) handle_->::exec::bit::FragmentHandle::Clear();
-  clear_has_handle();
-}
-inline const ::exec::bit::FragmentHandle& BitClientHandshake::handle() const {
-  return handle_ != NULL ? *handle_ : *default_instance_->handle_;
-}
-inline ::exec::bit::FragmentHandle* BitClientHandshake::mutable_handle() {
-  set_has_handle();
-  if (handle_ == NULL) handle_ = new ::exec::bit::FragmentHandle;
-  return handle_;
-}
-inline ::exec::bit::FragmentHandle* BitClientHandshake::release_handle() {
-  clear_has_handle();
-  ::exec::bit::FragmentHandle* temp = handle_;
-  handle_ = NULL;
-  return temp;
-}
-inline void BitClientHandshake::set_allocated_handle(::exec::bit::FragmentHandle* handle) {
-  delete handle_;
-  handle_ = handle;
-  if (handle) {
-    set_has_handle();
-  } else {
-    clear_has_handle();
-  }
-}
-
 // -------------------------------------------------------------------
 
 // BitServerHandshake
@@ -506,54 +479,101 @@ inline void BitServerHandshake::set_rpc_version(::google::protobuf::int32 value)
 
 // FragmentRecordBatch
 
-// optional .exec.bit.FragmentHandle handle = 1;
-inline bool FragmentRecordBatch::has_handle() const {
+// optional .exec.shared.QueryId query_id = 1;
+inline bool FragmentRecordBatch::has_query_id() const {
   return (_has_bits_[0] & 0x00000001u) != 0;
 }
-inline void FragmentRecordBatch::set_has_handle() {
+inline void FragmentRecordBatch::set_has_query_id() {
   _has_bits_[0] |= 0x00000001u;
 }
-inline void FragmentRecordBatch::clear_has_handle() {
+inline void FragmentRecordBatch::clear_has_query_id() {
   _has_bits_[0] &= ~0x00000001u;
 }
-inline void FragmentRecordBatch::clear_handle() {
-  if (handle_ != NULL) handle_->::exec::bit::FragmentHandle::Clear();
-  clear_has_handle();
+inline void FragmentRecordBatch::clear_query_id() {
+  if (query_id_ != NULL) query_id_->::exec::shared::QueryId::Clear();
+  clear_has_query_id();
 }
-inline const ::exec::bit::FragmentHandle& FragmentRecordBatch::handle() const {
-  return handle_ != NULL ? *handle_ : *default_instance_->handle_;
+inline const ::exec::shared::QueryId& FragmentRecordBatch::query_id() const {
+  return query_id_ != NULL ? *query_id_ : *default_instance_->query_id_;
 }
-inline ::exec::bit::FragmentHandle* FragmentRecordBatch::mutable_handle() {
-  set_has_handle();
-  if (handle_ == NULL) handle_ = new ::exec::bit::FragmentHandle;
-  return handle_;
+inline ::exec::shared::QueryId* FragmentRecordBatch::mutable_query_id() {
+  set_has_query_id();
+  if (query_id_ == NULL) query_id_ = new ::exec::shared::QueryId;
+  return query_id_;
 }
-inline ::exec::bit::FragmentHandle* FragmentRecordBatch::release_handle() {
-  clear_has_handle();
-  ::exec::bit::FragmentHandle* temp = handle_;
-  handle_ = NULL;
+inline ::exec::shared::QueryId* FragmentRecordBatch::release_query_id() {
+  clear_has_query_id();
+  ::exec::shared::QueryId* temp = query_id_;
+  query_id_ = NULL;
   return temp;
 }
-inline void FragmentRecordBatch::set_allocated_handle(::exec::bit::FragmentHandle* handle) {
-  delete handle_;
-  handle_ = handle;
-  if (handle) {
-    set_has_handle();
+inline void FragmentRecordBatch::set_allocated_query_id(::exec::shared::QueryId* query_id) {
+  delete query_id_;
+  query_id_ = query_id;
+  if (query_id) {
+    set_has_query_id();
   } else {
-    clear_has_handle();
+    clear_has_query_id();
   }
 }
 
-// optional int32 sending_major_fragment_id = 2;
-inline bool FragmentRecordBatch::has_sending_major_fragment_id() const {
+// optional int32 receiving_major_fragment_id = 2;
+inline bool FragmentRecordBatch::has_receiving_major_fragment_id() const {
   return (_has_bits_[0] & 0x00000002u) != 0;
 }
-inline void FragmentRecordBatch::set_has_sending_major_fragment_id() {
+inline void FragmentRecordBatch::set_has_receiving_major_fragment_id() {
   _has_bits_[0] |= 0x00000002u;
 }
-inline void FragmentRecordBatch::clear_has_sending_major_fragment_id() {
+inline void FragmentRecordBatch::clear_has_receiving_major_fragment_id() {
   _has_bits_[0] &= ~0x00000002u;
 }
+inline void FragmentRecordBatch::clear_receiving_major_fragment_id() {
+  receiving_major_fragment_id_ = 0;
+  clear_has_receiving_major_fragment_id();
+}
+inline ::google::protobuf::int32 FragmentRecordBatch::receiving_major_fragment_id() const {
+  return receiving_major_fragment_id_;
+}
+inline void FragmentRecordBatch::set_receiving_major_fragment_id(::google::protobuf::int32 value) {
+  set_has_receiving_major_fragment_id();
+  receiving_major_fragment_id_ = value;
+}
+
+// repeated int32 receiving_minor_fragment_id = 3;
+inline int FragmentRecordBatch::receiving_minor_fragment_id_size() const {
+  return receiving_minor_fragment_id_.size();
+}
+inline void FragmentRecordBatch::clear_receiving_minor_fragment_id() {
+  receiving_minor_fragment_id_.Clear();
+}
+inline ::google::protobuf::int32 FragmentRecordBatch::receiving_minor_fragment_id(int index) const {
+  return receiving_minor_fragment_id_.Get(index);
+}
+inline void FragmentRecordBatch::set_receiving_minor_fragment_id(int index, ::google::protobuf::int32 value) {
+  receiving_minor_fragment_id_.Set(index, value);
+}
+inline void FragmentRecordBatch::add_receiving_minor_fragment_id(::google::protobuf::int32 value) {
+  receiving_minor_fragment_id_.Add(value);
+}
+inline const ::google::protobuf::RepeatedField< ::google::protobuf::int32 >&
+FragmentRecordBatch::receiving_minor_fragment_id() const {
+  return receiving_minor_fragment_id_;
+}
+inline ::google::protobuf::RepeatedField< ::google::protobuf::int32 >*
+FragmentRecordBatch::mutable_receiving_minor_fragment_id() {
+  return &receiving_minor_fragment_id_;
+}
+
+// optional int32 sending_major_fragment_id = 4;
+inline bool FragmentRecordBatch::has_sending_major_fragment_id() const {
+  return (_has_bits_[0] & 0x00000008u) != 0;
+}
+inline void FragmentRecordBatch::set_has_sending_major_fragment_id() {
+  _has_bits_[0] |= 0x00000008u;
+}
+inline void FragmentRecordBatch::clear_has_sending_major_fragment_id() {
+  _has_bits_[0] &= ~0x00000008u;
+}
 inline void FragmentRecordBatch::clear_sending_major_fragment_id() {
   sending_major_fragment_id_ = 0;
   clear_has_sending_major_fragment_id();
@@ -566,15 +586,15 @@ inline void FragmentRecordBatch::set_sending_major_fragment_id(::google::protobu
   sending_major_fragment_id_ = value;
 }
 
-// optional int32 sending_minor_fragment_id = 3;
+// optional int32 sending_minor_fragment_id = 5;
 inline bool FragmentRecordBatch::has_sending_minor_fragment_id() const {
-  return (_has_bits_[0] & 0x00000004u) != 0;
+  return (_has_bits_[0] & 0x00000010u) != 0;
 }
 inline void FragmentRecordBatch::set_has_sending_minor_fragment_id() {
-  _has_bits_[0] |= 0x00000004u;
+  _has_bits_[0] |= 0x00000010u;
 }
 inline void FragmentRecordBatch::clear_has_sending_minor_fragment_id() {
-  _has_bits_[0] &= ~0x00000004u;
+  _has_bits_[0] &= ~0x00000010u;
 }
 inline void FragmentRecordBatch::clear_sending_minor_fragment_id() {
   sending_minor_fragment_id_ = 0;
@@ -588,15 +608,15 @@ inline void FragmentRecordBatch::set_sending_minor_fragment_id(::google::protobu
   sending_minor_fragment_id_ = value;
 }
 
-// optional .exec.shared.RecordBatchDef def = 4;
+// optional .exec.shared.RecordBatchDef def = 6;
 inline bool FragmentRecordBatch::has_def() const {
-  return (_has_bits_[0] & 0x00000008u) != 0;
+  return (_has_bits_[0] & 0x00000020u) != 0;
 }
 inline void FragmentRecordBatch::set_has_def() {
-  _has_bits_[0] |= 0x00000008u;
+  _has_bits_[0] |= 0x00000020u;
 }
 inline void FragmentRecordBatch::clear_has_def() {
-  _has_bits_[0] &= ~0x00000008u;
+  _has_bits_[0] &= ~0x00000020u;
 }
 inline void FragmentRecordBatch::clear_def() {
   if (def_ != NULL) def_->::exec::shared::RecordBatchDef::Clear();
@@ -626,15 +646,15 @@ inline void FragmentRecordBatch::set_allocated_def(::exec::shared::RecordBatchDe
   }
 }
 
-// optional bool isLastBatch = 5;
+// optional bool isLastBatch = 7;
 inline bool FragmentRecordBatch::has_islastbatch() const {
-  return (_has_bits_[0] & 0x00000010u) != 0;
+  return (_has_bits_[0] & 0x00000040u) != 0;
 }
 inline void FragmentRecordBatch::set_has_islastbatch() {
-  _has_bits_[0] |= 0x00000010u;
+  _has_bits_[0] |= 0x00000040u;
 }
 inline void FragmentRecordBatch::clear_has_islastbatch() {
-  _has_bits_[0] &= ~0x00000010u;
+  _has_bits_[0] &= ~0x00000040u;
 }
 inline void FragmentRecordBatch::clear_islastbatch() {
   islastbatch_ = false;
@@ -648,15 +668,15 @@ inline void FragmentRecordBatch::set_islastbatch(bool value) {
   islastbatch_ = value;
 }
 
-// optional bool isOutOfMemory = 6 [default = false];
+// optional bool isOutOfMemory = 8 [default = false];
 inline bool FragmentRecordBatch::has_isoutofmemory() const {
-  return (_has_bits_[0] & 0x00000020u) != 0;
+  return (_has_bits_[0] & 0x00000080u) != 0;
 }
 inline void FragmentRecordBatch::set_has_isoutofmemory() {
-  _has_bits_[0] |= 0x00000020u;
+  _has_bits_[0] |= 0x00000080u;
 }
 inline void FragmentRecordBatch::clear_has_isoutofmemory() {
-  _has_bits_[0] &= ~0x00000020u;
+  _has_bits_[0] &= ~0x00000080u;
 }
 inline void FragmentRecordBatch::clear_isoutofmemory() {
   isoutofmemory_ = false;

http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/src/protobuf/User.pb.cc
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/protobuf/User.pb.cc b/contrib/native/client/src/protobuf/User.pb.cc
index d85c81b..360becb 100644
--- a/contrib/native/client/src/protobuf/User.pb.cc
+++ b/contrib/native/client/src/protobuf/User.pb.cc
@@ -220,14 +220,14 @@ void protobuf_AddDesc_User_2eproto() {
     "lts_mode\030\001 \001(\0162\033.exec.user.QueryResultsM"
     "ode\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryTyp"
     "e\022\014\n\004plan\030\003 \001(\t\")\n\022BitToUserHandshake\022\023\n"
-    "\013rpc_version\030\002 \001(\005*\270\001\n\007RpcType\022\r\n\tHANDSH"
+    "\013rpc_version\030\002 \001(\005*\310\001\n\007RpcType\022\r\n\tHANDSH"
     "AKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QUERY"
     "\020\003\022\020\n\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020"
-    "\005\022\020\n\014QUERY_RESULT\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026\n"
-    "\022REQ_META_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_L"
-    "IST\020\t*#\n\020QueryResultsMode\022\017\n\013STREAM_FULL"
-    "\020\001B+\n\033org.apache.drill.exec.protoB\nUserP"
-    "rotosH\001", 927);
+    "\005\022\016\n\nQUERY_DATA\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026\n\022R"
+    "EQ_META_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_LIS"
+    "T\020\t\022\020\n\014QUERY_RESULT\020\n*#\n\020QueryResultsMod"
+    "e\022\017\n\013STREAM_FULL\020\001B+\n\033org.apache.drill.e"
+    "xec.protoB\nUserProtosH\001", 943);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "User.proto", &protobuf_RegisterTypes);
   Property::default_instance_ = new Property();
@@ -267,6 +267,7 @@ bool RpcType_IsValid(int value) {
     case 7:
     case 8:
     case 9:
+    case 10:
       return true;
     default:
       return false;

http://git-wip-us.apache.org/repos/asf/drill/blob/4f213570/contrib/native/client/src/protobuf/User.pb.h
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/protobuf/User.pb.h b/contrib/native/client/src/protobuf/User.pb.h
index eca199d..69daf50 100644
--- a/contrib/native/client/src/protobuf/User.pb.h
+++ b/contrib/native/client/src/protobuf/User.pb.h
@@ -51,14 +51,15 @@ enum RpcType {
   RUN_QUERY = 3,
   CANCEL_QUERY = 4,
   REQUEST_RESULTS = 5,
-  QUERY_RESULT = 6,
+  QUERY_DATA = 6,
   QUERY_HANDLE = 7,
   REQ_META_FUNCTIONS = 8,
-  RESP_FUNCTION_LIST = 9
+  RESP_FUNCTION_LIST = 9,
+  QUERY_RESULT = 10
 };
 bool RpcType_IsValid(int value);
 const RpcType RpcType_MIN = HANDSHAKE;
-const RpcType RpcType_MAX = RESP_FUNCTION_LIST;
+const RpcType RpcType_MAX = QUERY_RESULT;
 const int RpcType_ARRAYSIZE = RpcType_MAX + 1;
 
 const ::google::protobuf::EnumDescriptor* RpcType_descriptor();


Mime
View raw message