drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [2/2] drill git commit: DRILL-4313: C++ Client - Thread safe Logging. Improved Drill bit selection. - Update random drill bit selection. Shuffle the list initially, then round robin. Add Utility methods to get random numbers and to shuffle and add vect
Date Tue, 08 Mar 2016 02:04:46 GMT
DRILL-4313: C++ Client - Thread safe Logging.  Improved Drill bit selection.
 - Update random drill bit selection. Shuffle the list initially, then round robin. Add Utility methods to get random numbers and to shuffle and add vectors. Whitespace cleanup
 - Add Git properties to build and print to log.
 - Add interface to get error based on query handle.
 - Add support for Pooled connections. Allows switching between pooled and unpooled connections based on environment variables


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/df0f0af3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/df0f0af3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/df0f0af3

Branch: refs/heads/master
Commit: df0f0af3d963c1b65eb01c3141fe84532c53f5a5
Parents: a2fec78
Author: Parth Chandra <parthc@apache.org>
Authored: Fri Feb 12 15:42:53 2016 -0800
Committer: Parth Chandra <parthc@apache.org>
Committed: Mon Mar 7 17:49:50 2016 -0800

----------------------------------------------------------------------
 contrib/native/client/CMakeLists.txt            |  24 +-
 .../client/cmakeModules/FindZookeeper.cmake     |   2 +-
 .../native/client/example/querySubmitter.cpp    |  25 +-
 .../native/client/src/clientlib/drillClient.cpp |  33 +-
 .../client/src/clientlib/drillClientImpl.cpp    | 600 ++++++++++++++-----
 .../client/src/clientlib/drillClientImpl.hpp    | 169 +++++-
 contrib/native/client/src/clientlib/env.h.in    |  26 +
 contrib/native/client/src/clientlib/errmsgs.cpp |   2 +
 contrib/native/client/src/clientlib/errmsgs.hpp |   4 +-
 contrib/native/client/src/clientlib/logger.cpp  | 126 ++--
 contrib/native/client/src/clientlib/logger.hpp  |  85 +--
 contrib/native/client/src/clientlib/utils.cpp   | 109 ++--
 contrib/native/client/src/clientlib/utils.hpp   | 100 +++-
 .../native/client/src/include/drill/common.hpp  |   9 +-
 .../client/src/include/drill/drillClient.hpp    |   7 +-
 15 files changed, 1001 insertions(+), 320 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/CMakeLists.txt b/contrib/native/client/CMakeLists.txt
index 603586d..b22af42 100644
--- a/contrib/native/client/CMakeLists.txt
+++ b/contrib/native/client/CMakeLists.txt
@@ -22,8 +22,20 @@ project(drillclient)
 message("Project Dir = ${PROJECT_SOURCE_DIR}")
 message("Source Dir = ${CMAKE_SOURCE_DIR} ")
 
+cmake_policy(SET CMP0043 NEW)
+
 set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmakeModules/")
 
+# Get the latest git commit properties of the working branch
+execute_process(
+    COMMAND git log -1 --format="\\nCommit: %H \\nDescription: %s \\nAuthor: %aN Date: %ai"
+    WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
+    OUTPUT_VARIABLE GIT_COMMIT_PROP
+    OUTPUT_STRIP_TRAILING_WHITESPACE
+    )
+add_definitions("-DGIT_COMMIT_PROP=${GIT_COMMIT_PROP}")
+
+
 
 # Find Boost
 if(MSVC)
@@ -36,7 +48,7 @@ else()
     set(Boost_USE_STATIC_RUNTIME OFF)
 endif()
 
-find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono thread )
+find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono thread random)
 include_directories(${Boost_INCLUDE_DIRS})
 
 if(CMAKE_COMPILER_IS_GNUCXX)
@@ -63,6 +75,16 @@ include_directories(${PROTOBUF_INCLUDE_DIR})
 #Find Zookeeper
 find_package(Zookeeper  REQUIRED )
 
+
+# Generated sources
+configure_file(
+    ${CMAKE_SOURCE_DIR}/src/clientlib/env.h.in
+    ${CMAKE_BINARY_DIR}/generated/env.h
+    )
+
+include_directories(${CMAKE_BINARY_DIR}/generated)
+
+
 #
 #   TARGETS
 #

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/cmakeModules/FindZookeeper.cmake
----------------------------------------------------------------------
diff --git a/contrib/native/client/cmakeModules/FindZookeeper.cmake b/contrib/native/client/cmakeModules/FindZookeeper.cmake
index fd8247f..151c05c 100644
--- a/contrib/native/client/cmakeModules/FindZookeeper.cmake
+++ b/contrib/native/client/cmakeModules/FindZookeeper.cmake
@@ -40,7 +40,7 @@ if (MSVC)
         message("- CMAKE will look for zookeeper library files in $ZOOKEEPER_HOME/src/c/Debug or $ZOOKEEPER_HOME/src/c/Release.")
     else()
         FILE(TO_CMAKE_PATH ${ZOOKEEPER_HOME} Zookeeper_HomePath)
-        set(Zookeeper_LIB_PATHS ${Zookeeper_HomePath}/src/c/${ZK_BuildOutputDir})
+        set(Zookeeper_LIB_PATHS ${Zookeeper_HomePath}/src/c/${ZK_BuildOutputDir} ${Zookeeper_HomePath}/src/c/x64/${ZK_BuildOutputDir} )
 
         find_path(ZK_INCLUDE_DIR zookeeper.h ${Zookeeper_HomePath}/src/c/include)
         find_path(ZK_INCLUDE_DIR_GEN zookeeper.jute.h ${Zookeeper_HomePath}/src/c/generated)

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 960ff4f..d507d1b 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -20,6 +20,7 @@
 #include <iostream>
 #include <stdio.h>
 #include <stdlib.h>
+#include <boost/thread.hpp>
 #include "drill/drillc.hpp"
 
 int nOptions=13;
@@ -65,11 +66,13 @@ Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::Dril
     }
 }
 
+boost::mutex listenerMutex;
 Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::DrillClientError* err){
     // Invariant:
     // (received an record batch and err is NULL)
     // or
     // (received query state message passed by `err` and b is NULL)
+    boost::lock_guard<boost::mutex> listenerLock(listenerMutex);
     if(!err){
         if(b!=NULL){
             b->print(std::cout, 0); // print all rows
@@ -317,16 +320,24 @@ int main(int argc, char* argv[]) {
         std::vector<Drill::QueryHandle_t*>::iterator queryHandleIter;
 
         Drill::DrillClient client;
-        // To log to file
-        //DrillClient::initLogging("/var/log/drill/", l);
+#if defined _WIN32 || defined _WIN64
+        TCHAR tempPath[MAX_PATH];
+        GetTempPath(MAX_PATH, tempPath);
+		char logpathPrefix[MAX_PATH + 128];
+		strcpy(logpathPrefix,tempPath);
+		strcat(logpathPrefix, "\\drillclient");
+#else
+		char* logpathPrefix = "/var/log/drill/drillclient";
+#endif
+		// To log to file
+        Drill::DrillClient::initLogging(logpathPrefix, l);
         // To log to stderr
-        Drill::DrillClient::initLogging(NULL, l);
-        //Drill::DrillClientConfig::setBufferLimit(2*1024*1024); // 2MB. Allows us to hold at least two record batches.
-        int nQueries=queryInputs.size();
-        Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query. Allows us to hold at least two record batches.
+        //Drill::DrillClient::initLogging(NULL, l);
 
+        int nQueries=queryInputs.size();
+        Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query. The size of a record batch may vary, but is unlikely to exceed the 256 MB which is the default. 
 
-        if (!hshakeTimeout.empty()){
+        if(!hshakeTimeout.empty()){
             Drill::DrillClientConfig::setHandshakeTimeout(atoi(hshakeTimeout.c_str()));
         }
         if (!queryTimeout.empty()){

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 7087938..92c5194 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-
+#include <stdlib.h>
 #include <boost/assign.hpp>
 #include "drill/common.hpp"
 #include "drill/drillClient.hpp"
@@ -56,21 +56,22 @@ int32_t DrillClientConfig::s_heartbeatFrequency=15; // 15 seconds
 boost::mutex DrillClientConfig::s_mutex;
 
 DrillClientConfig::DrillClientConfig(){
-    initLogging(NULL);
+    // Do not initialize logging. The Logger object is static and may 
+    // not have been initialized yet
+    //initLogging(NULL);
 }
 
 DrillClientConfig::~DrillClientConfig(){
-    Logger::close();
 }
 
 void DrillClientConfig::initLogging(const char* path){
-    Logger::init(path);
+    getLogger().init(path);
 }
 
 void DrillClientConfig::setLogLevel(logLevel_t l){
     boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
     s_logLevel=l;
-    Logger::s_level=l;
+    getLogger().m_level=l;
     //boost::log::core::get()->set_filter(boost::log::trivial::severity >= s_logLevel);
 }
 
@@ -163,7 +164,7 @@ RecordIterator::~RecordIterator(){
     delete this->m_pQueryResult;
     this->m_pQueryResult=NULL;
     if(this->m_pCurrentRecordBatch!=NULL){
-        DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) m_pCurrentRecordBatch << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) m_pCurrentRecordBatch << std::endl;)
         delete this->m_pCurrentRecordBatch; this->m_pCurrentRecordBatch=NULL;
     }
 }
@@ -224,7 +225,7 @@ status_t RecordIterator::next(){
         if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){
             boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex);
             if(this->m_pCurrentRecordBatch !=NULL){
-                DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;)
                 delete this->m_pCurrentRecordBatch; //free the previous record batch
                 this->m_pCurrentRecordBatch=NULL;
             }
@@ -235,12 +236,12 @@ status_t RecordIterator::next(){
             }
             this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext();
             if(this->m_pCurrentRecordBatch != NULL){
-                DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;)
             }else{
-                DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;)
             }
             if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){
-                DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;)
                 ret = QRY_NO_MORE_DATA;
             }else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){
                 ret=QRY_SUCCESS_WITH_INFO;
@@ -315,7 +316,12 @@ void DrillClient::initLogging(const char* path, logLevel_t l){
 }
 
 DrillClient::DrillClient(){
-    this->m_pImpl=new DrillClientImpl;
+    const char* enablePooledClient=std::getenv(ENABLE_CONNECTION_POOL_ENV);
+    if(enablePooledClient!=NULL && atoi(enablePooledClient)!=0){
+        this->m_pImpl=new PooledDrillClientImpl;
+    }else{
+        this->m_pImpl=new DrillClientImpl;
+    }
 }
 
 DrillClient::~DrillClient(){
@@ -378,10 +384,12 @@ RecordIterator* DrillClient::submitQuery(Drill::QueryType t, const std::string&
 }
 
 void* DrillClient::getApplicationContext(QueryHandle_t handle){
+    assert(handle!=NULL);
     return ((DrillClientQueryResult*)handle)->getListenerContext();
 }
 
 status_t DrillClient::getQueryStatus(QueryHandle_t handle){
+    assert(handle!=NULL);
     return ((DrillClientQueryResult*)handle)->getQueryStatus();
 }
 
@@ -389,6 +397,9 @@ std::string& DrillClient::getError(){
     return m_pImpl->getError()->msg;
 }
 
+const std::string& DrillClient::getError(QueryHandle_t handle){
+    return ((DrillClientQueryResult*)handle)->getError()->msg;
+}
 
 void DrillClient::waitForResults(){
     this->m_pImpl->waitForResults();

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index d4e9ed9..3ec01f5 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -78,47 +78,46 @@ void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){
 #endif
 }
 
-
-void DrillClientImpl::parseConnectStr(const char* connectStr,
-        std::string& pathToDrill,
-        std::string& protocol,
-        std::string& hostPortStr){
-    char u[MAX_CONNECT_STR+1];
-    strncpy(u,connectStr, MAX_CONNECT_STR); u[MAX_CONNECT_STR]=0;
-    char* z=strtok(u, "=");
-    char* c=strtok(NULL, "/");
-    char* p=strtok(NULL, "");
-
-    if(p!=NULL) pathToDrill=std::string("/")+p;
-    protocol=z; hostPortStr=c;
-    return;
-}
-
 connectionStatus_t DrillClientImpl::connect(const char* connStr){
     std::string pathToDrill, protocol, hostPortStr;
     std::string host;
     std::string port;
     if(!this->m_bIsConnected){
-        parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+        m_connectStr=connStr;
+        Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
         if(!strcmp(protocol.c_str(), "zk")){
             ZookeeperImpl zook;
-            if(zook.connectToZookeeper(hostPortStr.c_str(), pathToDrill.c_str())!=0){
+            std::vector<std::string> drillbits;
+            int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
+            if(!err){
+                Utils::shuffle(drillbits);
+                exec::DrillbitEndpoint endpoint;
+                err = zook.getEndPoint(drillbits, drillbits.size()-1, endpoint);// get the last one in the list
+                if(!err){
+                    host=boost::lexical_cast<std::string>(endpoint.address());
+                    port=boost::lexical_cast<std::string>(endpoint.user_port());
+                }
+            }
+            if(err){
                 return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
             }
-            zook.debugPrint();
-            exec::DrillbitEndpoint e=zook.getEndPoint();
-            host=boost::lexical_cast<std::string>(e.address());
-            port=boost::lexical_cast<std::string>(e.user_port());
             zook.close();
+            m_bIsDirectConnection=true;  
         }else if(!strcmp(protocol.c_str(), "local")){
+            boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant
             char tempStr[MAX_CONNECT_STR+1];
             strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
             host=strtok(tempStr, ":");
             port=strtok(NULL, "");
+            m_bIsDirectConnection=false;  
         }else{
             return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
         }
-        return this->connect(host.c_str(), port.c_str());
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;)
+        connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
+        return ret;
+    }else if(std::strcmp(connStr, m_connectStr.c_str())){ // tring to connect to a different address is not allowed if already connected
+        return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
     }
     return CONN_SUCCESS;
 }
@@ -133,7 +132,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
         tcp::resolver::iterator end;
         while (iter != end){
             endpoint = *iter++;
-            DRILL_LOG(LOG_TRACE) << endpoint << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;)
         }
         boost::system::error_code ec;
         m_socket.connect(endpoint, ec);
@@ -149,6 +148,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
         return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what()));
     }
 
+    m_bIsConnected=true;
     // set socket keep alive
     boost::asio::socket_base::keep_alive keepAlive(true);
     m_socket.set_option(keepAlive);
@@ -156,35 +156,34 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
     boost::asio::ip::tcp::no_delay noDelay(true);
     m_socket.set_option(noDelay);
 
-    //
-    // We put some OS dependent code here for timing out a socket. Mostly, this appears to
-    // do nothing. Should we leave it in there?
-    //
-    setSocketTimeout(m_socket, DrillClientConfig::getSocketTimeout());
-
+    std::ostringstream connectedHost;
+    connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port;
+    m_connectedHost = connectedHost.str();
+    DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;)
+    
     return CONN_SUCCESS;
 }
 
 void DrillClientImpl::startHeartbeatTimer(){
-    DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
-        << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
+        << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;)
     m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
     m_heartbeatTimer.async_wait(boost::bind(
                 &DrillClientImpl::handleHeartbeatTimeout,
                 this,
                 boost::asio::placeholders::error
                 ));
-	    startMessageListener(); // start this thread early so we don't have the timer blocked
+        startMessageListener(); // start this thread early so we don't have the timer blocked
 }
 
 connectionStatus_t DrillClientImpl::sendHeartbeat(){
-	connectionStatus_t status=CONN_SUCCESS;
+    connectionStatus_t status=CONN_SUCCESS;
     exec::rpc::Ack ack;
     ack.set_ok(true);
     OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack);
-	boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
-	boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;
+    boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
+    boost::lock_guard<boost::mutex> lock(m_dcMutex);
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;)
     status=sendSync(heartbeatMsg);
     status=status==CONN_SUCCESS?status:CONN_DEAD;
     //If the server sends responses to a heartbeat, we need to increment the pending requests counter.
@@ -196,21 +195,19 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){
 
 void DrillClientImpl::resetHeartbeatTimer(){
     m_heartbeatTimer.cancel();
-    DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;)
     startHeartbeatTimer();
 }
 
-
-
 void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;)
     if(err != boost::asio::error::operation_aborted){
         // Check whether the deadline has passed.
-        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer -  Expires at: " 
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer -  Expires at: " 
             << to_simple_string(m_heartbeatTimer.expires_at())
             << " and time now is: "
             << to_simple_string(boost::asio::deadline_timer::traits_type::now())
-            << std::endl;
+            << std::endl;)
             ;
         if (m_heartbeatTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
             // The deadline has passed.
@@ -219,7 +216,7 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e
                 startHeartbeatTimer();
             }else{
                 // Close connection.
-                DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";)
                 shutdownSocket();
             }
         }
@@ -227,7 +224,6 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e
     return;
 }
 
-
 void DrillClientImpl::Close() {
     shutdownSocket();
 }
@@ -257,8 +253,8 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
                     this,
                     boost::asio::placeholders::error
                     ));
-        DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with "
-                << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with "
+                << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;)
     }
 
     async_read(
@@ -271,7 +267,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred)
             );
-    DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";)
     m_io_service.run();
     if(m_rbuf!=NULL){
         Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
@@ -292,7 +288,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
     boost::system::error_code error=err;
     // cancel the timer
     m_deadlineTimer.cancel();
-    DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;)
     if(!error){
         InBoundRpcMessage msg;
         uint32_t length = 0;
@@ -306,14 +302,14 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
                         boost::asio::buffer(b, bytesToRead),
                         error);
                 if(err) break;
-                DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;)
                 if(dataBytesRead==bytesToRead) break;
                 bytesToRead-=dataBytesRead;
                 b+=dataBytesRead;
             }
             DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg);
         }else{
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";)
             handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake"));
             return;
         }
@@ -344,7 +340,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
         if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
             // The deadline has passed.
             m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";)
             handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT));
             m_io_service.stop();
             boost::system::error_code ignorederr;
@@ -356,7 +352,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
 
 connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* properties){
 
-    DRILL_LOG(LOG_TRACE) << "validateHandShake\n";
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "validateHandShake\n";)
 
     exec::user::UserToBitHandshake u2b;
     u2b.set_channel(exec::shared::USER);
@@ -368,7 +364,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
         std::string username;
         std::string err;
         if(!properties->validate(err)){
-            DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;)
         }
         exec::user::UserProperties* userProperties = u2b.mutable_properties();
 
@@ -376,8 +372,8 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
         for(size_t i=0; i<properties->size(); i++){
             std::map<std::string,uint32_t>::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(properties->keyAt(i));
             if(it==DrillUserProperties::USER_PROPERTIES.end()){
-                DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) 
-                    << ") is unknown and is being skipped" << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) 
+                    << ") is unknown and is being skipped" << std::endl;)
                 continue;
             }
             if(IS_BITSET((*it).second,USERPROP_FLAGS_SERVERPROP)){
@@ -392,9 +388,9 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
                     //u2b.set_credentials(&creds);
                 }
                 if(IS_BITSET((*it).second,USERPROP_FLAGS_PASSWORD)){
-                    DRILL_LOG(LOG_INFO) <<  properties->keyAt(i) << ": ********** " << std::endl;
+                    DRILL_MT_LOG(DRILL_LOG(LOG_INFO) <<  properties->keyAt(i) << ": ********** " << std::endl;)
                 }else{
-                    DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << properties->valueAt(i) << std::endl;
+                    DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << properties->valueAt(i) << std::endl;)
                 }
             }// Server properties
         }
@@ -406,7 +402,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
 
         OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
         sendSync(out_msg);
-        DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";)
     }
 
     connectionStatus_t ret = recvHandshake();
@@ -416,21 +412,21 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
     if(this->m_handshakeStatus != exec::user::SUCCESS){
         switch(this->m_handshakeStatus){
             case exec::user::RPC_VERSION_MISMATCH:
-                DRILL_LOG(LOG_TRACE) << "Invalid rpc version.  Expected "
-                    << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Invalid rpc version.  Expected "
+                    << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;)
                 return handleConnError(CONN_BAD_RPC_VER,
                         getMessage(ERR_CONN_BAD_RPC_VER, DRILL_RPC_VERSION,
                             m_handshakeVersion,
                             this->m_handshakeErrorId.c_str(),
                             this->m_handshakeErrorMsg.c_str()));
             case exec::user::AUTH_FAILED:
-                DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;)
                 return handleConnError(CONN_AUTH_FAILED,
                         getMessage(ERR_CONN_AUTHFAIL,
                             this->m_handshakeErrorId.c_str(),
                             this->m_handshakeErrorMsg.c_str()));
             case exec::user::UNKNOWN_FAILURE:
-                DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl;)
                 return handleConnError(CONN_HANDSHAKE_FAILED,
                         getMessage(ERR_CONN_UNKNOWN_ERR,
                             this->m_handshakeErrorId.c_str(),
@@ -451,14 +447,14 @@ void DrillClientImpl::startMessageListener() {
     if(this->m_pListenerThread==NULL){
         // Stopping the io_service from running out-of-work
         if(m_io_service.stopped()){
-            DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl;)
             m_io_service.reset();
         }
         this->m_pWork = new boost::asio::io_service::work(m_io_service);
         this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run,
                     &this->m_io_service));
-        DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: "
-            << this->m_pListenerThread << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: "
+            << this->m_pListenerThread << std::endl;)
     }
 }
 
@@ -480,22 +476,23 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
         OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query);
         sendSync(out_msg);
 
-        pQuery = new DrillClientQueryResult(this, coordId);
+        pQuery = new DrillClientQueryResult(this, coordId, plan);
         pQuery->registerListener(l, lCtx);
         bool sendRequest=false;
         this->m_queryIds[coordId]=pQuery;
 
-        DRILL_LOG(LOG_DEBUG)  << "Sent query request. Coordination id = " << coordId << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query request. " << "[" << m_connectedHost << "]"  << "Coordination id = " << coordId << std::endl;)
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query " <<  "Coordination id = " << coordId << " query: " << plan << std::endl;)
 
         if(m_pendingRequests++==0){
             sendRequest=true;
         }else{
-            DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;
-            DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;)
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;)
         }
         if(sendRequest){
-            DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = "
-                << m_pendingRequests << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = "
+                << m_pendingRequests << std::endl;)
             getNextResult(); // async wait for results
         }
     }
@@ -513,7 +510,7 @@ void DrillClientImpl::getNextResult(){
 
     {
         boost::unique_lock<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
-        DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;)
         while(AllocatedBuffer::s_isBufferLimitReached){
             AllocatedBuffer::s_memCV.wait(memLock);
         }
@@ -522,8 +519,8 @@ void DrillClientImpl::getNextResult(){
     //use free, not delete to free
     ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
     if (DrillClientConfig::getQueryTimeout() > 0){
-        DRILL_LOG(LOG_TRACE) << "Started new query wait timer with "
-                << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new query wait timer with "
+                << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;)
         m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getQueryTimeout()));
         m_deadlineTimer.async_wait(boost::bind(
             &DrillClientImpl::handleReadTimeout,
@@ -544,7 +541,7 @@ void DrillClientImpl::getNextResult(){
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred)
             );
-    DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";)
 }
 
 void DrillClientImpl::waitForResults(){
@@ -565,8 +562,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
         InBoundRpcMessage& msg,
         boost::system::error_code& error){
 
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
-        <<  reinterpret_cast<int*>(_buf) << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
+        <<  reinterpret_cast<int*>(_buf) << std::endl;)
     size_t leftover=0;
     uint32_t rmsgLen;
     AllocatedBufferPtr currentBuffer;
@@ -576,15 +573,15 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
         // but we don't have to keep the lock while we decode the rest of the buffer.
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen);
-        DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;
-        DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;)
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;)
 
         if(rmsgLen>0){
             leftover = LEN_PREFIX_BUFLEN - bytes_read;
             // Allocate a buffer
             currentBuffer=new AllocatedBuffer(rmsgLen);
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ "
-                << currentBuffer << ", size = " << rmsgLen << " ]\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ "
+                << currentBuffer << ", size = " << rmsgLen << " ]\n";)
             if(currentBuffer==NULL){
                 Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
                 return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
@@ -593,8 +590,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
             if(leftover){
                 memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover);
             }
-            DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
-                << (rmsgLen - leftover) << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
+                << (rmsgLen - leftover) << std::endl;)
             ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
             size_t bytesToRead=rmsgLen - leftover;
               
@@ -603,7 +600,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
                         boost::asio::buffer(b, bytesToRead),
                         error);
                 if(error) break;
-                DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;)
                 if(dataBytesRead==bytesToRead) break;
                 bytesToRead-=dataBytesRead;
                 b+=dataBytesRead;
@@ -612,7 +609,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
             if(!error){
                 // read data successfully
                 DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg);
-                DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;)
             }else{
                 Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
                 return handleQryError(QRY_COMM_ERROR,
@@ -624,8 +621,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
             return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
         }
     }
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
-        <<  reinterpret_cast<int*>(_buf) << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
+        <<  reinterpret_cast<int*>(_buf) << std::endl;)
     Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
     return QRY_SUCCESS;
 }
@@ -639,9 +636,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         exec::shared::QueryResult qr;
 
-        DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;)
         qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;)
         
         qid.CopyFrom(qr.query_id());
         
@@ -657,7 +654,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
                 std::string valErr;
                 if( (ret=validateResultMessage(msg, qr, valErr)) != QRY_SUCCESS){
                     delete allocatedBuffer;
-                    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;
+                    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;)
                     return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
                 }
                 ret=processQueryStatusResult(&qr, pDrillClientQueryResult);
@@ -665,9 +662,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
                 // We've received the final message for a query that has been cancelled
                 // or for which the resources have been freed. We no longer need to listen
                 // for more incoming messages for such a query.
-                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;)
                 m_pendingRequests--;
-                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;)
                 ret=QRY_CANCELED;
             }
             delete allocatedBuffer;
@@ -676,10 +673,10 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
             // Normal query results come back with query_state not set.
             // Actually this is not strictly true. The query state is set to
             // 0(i.e. PENDING), but protobuf thinks this means the value is not set.
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";)
         }
     }
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;)
     if(m_pendingRequests==0){
         // signal any waiting client that it can exit because there are no more any query results to arrive.
         // We keep the heartbeat going though.
@@ -701,21 +698,21 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         exec::shared::QueryData* qr = new exec::shared::QueryData; //Record Batch will own this object and free it up.
 
-        DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;)
         qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;)
 
         qid.CopyFrom(qr->query_id());
         if(qid.part1()==0){
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;)
             delete allocatedBuffer;
             return QRY_SUCCESS;
         }
 
         pDrillClientQueryResult=findQueryResult(qid);
         if(pDrillClientQueryResult==NULL){
-            DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" 
-                                 << debugPrintQid(qid) << ")." << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" 
+                                 << debugPrintQid(qid) << ")." << std::endl;)
             delete qr;
             delete allocatedBuffer;
             return ret;
@@ -726,23 +723,23 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
             delete allocatedBuffer;
             delete qr;
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";)
             pDrillClientQueryResult->setQueryStatus(ret);
             return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
         }
 
         //Build Record Batch here
-        DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;)
 
         pRecordBatch= new RecordBatch(qr, allocatedBuffer,  msg.m_dbody);
         pDrillClientQueryResult->m_numBatches++;
 
-        DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;)
         pRecordBatch->build();
-        DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
-            << pRecordBatch->getNumRecords()  << std::endl;
-        DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
-            << pRecordBatch->getNumFields()  << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
+            << pRecordBatch->getNumRecords()  << std::endl;)
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
+            << pRecordBatch->getNumFields()  << std::endl;)
 
         ret=pDrillClientQueryResult->setupColumnDefs(qr);
         if(ret==QRY_SUCCESS_WITH_INFO){
@@ -752,8 +749,8 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         pDrillClientQueryResult->setIsQueryPending(true);
         pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener;
         if(pDrillClientQueryResult->m_bIsLastChunk){
-            DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
-                <<  "Received last batch. " << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
+                <<  "Received last batch. " << std::endl;)
             ret=QRY_NO_MORE_DATA;
         }
         pDrillClientQueryResult->setQueryStatus(ret);
@@ -770,7 +767,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
         // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are
         // pushed on the wire before the cancel is processed.
         pDrillClientQueryResult->setIsQueryPending(false);
-        DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;)
         pDrillClientQueryResult->setQueryStatus(ret);
         clearMapEntries(pDrillClientQueryResult);
         return ret;
@@ -780,27 +777,27 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr  allocatedBuffer,
 
 status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
-    DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;)
     status_t ret=QRY_SUCCESS;
 
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     std::map<int,DrillClientQueryResult*>::iterator it;
     for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){
         std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL");
-        DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first
-        << " QueryId: "<< qidString << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first
+        << " QueryId: "<< qidString << std::endl;)
     }
     if(msg.m_coord_id==0){
-        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
         return QRY_SUCCESS;
     }
     it=this->m_queryIds.find(msg.m_coord_id);
     if(it!=this->m_queryIds.end()){
         pDrillClientQueryResult=(*it).second;
         exec::shared::QueryId *qid = new exec::shared::QueryId;
-        DRILL_LOG(LOG_TRACE)  << "Received Query Handle " << msg.m_pbody.size() << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received Query Handle " << msg.m_pbody.size() << std::endl;)
         qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;)
         m_queryResults[qid]=pDrillClientQueryResult;
         //save queryId allocated here so we can free it later
         pDrillClientQueryResult->setQueryId(qid);
@@ -814,20 +811,20 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
 
 DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
-    DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;)
     std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;)
     if(m_queryResults.size() != 0){
         for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
-                << it->first->part2() << "]\n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
+                << it->first->part2() << "]\n";)
         }
     }
     it=this->m_queryResults.find(&qid);
     if(it!=this->m_queryResults.end()){
         pDrillClientQueryResult=(*it).second;
-        DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
-            debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
+            debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;)
     }
     return pDrillClientQueryResult;
 }
@@ -870,7 +867,7 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr
             break;
         default:
             {
-                DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";)
                 ret=handleQryError(QRY_INTERNAL_ERROR,
                         getMessage(ERR_QRY_UNKQRYSTATE),
                         pDrillClientQueryResult);
@@ -887,7 +884,7 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
         // Check whether the deadline has passed.
         if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
             // The deadline has passed.
-            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";)
             handleQryError(QRY_TIMEOUT, getMessage(ERR_QRY_TIMOUT), NULL);
             // There is no longer an active deadline. The expiry is set to positive
             // infinity so that the timer never expires until a new deadline is set.
@@ -913,18 +910,18 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         const boost::system::error_code& err,
         size_t bytes_transferred) {
     boost::system::error_code error=err;
-    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
-        <<  reinterpret_cast<int*>(_buf) << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
+        <<  reinterpret_cast<int*>(_buf) << std::endl;)
     if(DrillClientConfig::getQueryTimeout() > 0){
         // Cancel the timeout if handleRead is called
-        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";)
         m_deadlineTimer.cancel();
     }
     if(!error){
         InBoundRpcMessage msg;
         boost::lock_guard<boost::mutex> lock(this->m_prMutex);
 
-        DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
         AllocatedBufferPtr allocatedBuffer=NULL;
 
         if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){
@@ -938,14 +935,14 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away
             m_pendingRequests--;
             delete allocatedBuffer;
-            DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " <<  std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " <<  std::endl;)
             if(m_pendingRequests!=0){
                 boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
                 getNextResult();
             }else{
-				boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
-                DRILL_LOG(LOG_TRACE) << "No more results expected from server. " <<  std::endl;
-				m_cv.notify_one();
+                boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " <<  std::endl;)
+                m_cv.notify_one();
             }
             return;
         }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
@@ -988,7 +985,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
                 // We have a socket read error, but we do not know which query this is for.
                 // Signal ALL pending queries that they should stop waiting.
                 delete allocatedBuffer;
-                DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;)
                 handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
                 return;
             }else{
@@ -997,20 +994,20 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
                 // We should properly handle these handshake requests/responses
                 if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){
                     if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){
-                        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";
+                        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";)
                         exec::user::UserToBitHandshake u2b;
                         u2b.set_channel(exec::shared::USER);
                         u2b.set_rpc_version(DRILL_RPC_VERSION);
                         u2b.set_support_listening(true);
                         OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
                         sendSync(out_msg);
-                        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";
+                        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";)
                     }else{
-                        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";
+                        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
                     }
                 }else{
-                    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
-                        << "QueryResult returned " << msg.m_rpc_type << std::endl;
+                    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+                        << "QueryResult returned " << msg.m_rpc_type << std::endl;)
                     handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
                 }
                 delete allocatedBuffer;
@@ -1025,8 +1022,8 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
         // boost error
         Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-        DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
-            "Boost Communication Error: " << error.message() << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
+            "Boost Communication Error: " << error.message() << std::endl;)
         handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
         return;
     }
@@ -1066,6 +1063,7 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s
     }else{
         if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
         m_pError=pErr;
+        shutdownSocket();
     }
     return status;
 }
@@ -1158,7 +1156,7 @@ void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){
     OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
     sendSync(ack_msg);
-    DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;)
 }
 
 void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
@@ -1166,7 +1164,7 @@ void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
     uint64_t coordId = this->getNextCoordinationId();
     OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId);
     sendSync(cancel_msg);
-    DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;)
 }
 
 void DrillClientImpl::shutdownSocket(){
@@ -1174,7 +1172,7 @@ void DrillClientImpl::shutdownSocket(){
     boost::system::error_code ignorederr;
     m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
     m_bIsConnected=false;
-    DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;)
 }
 
 // This COPIES the FieldMetadata definition for the record batch.  ColumnDefs held by this
@@ -1236,7 +1234,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
         RecordBatch* b,
         DrillClientError* err) {
     //ctx; // unused, we already have the this pointer
-    DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;)
     //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server.
     if(this->m_bCancel){
         if(b!=NULL) delete b;
@@ -1247,8 +1245,8 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
         {
             if(b!=NULL){
 #ifdef DEBUG
-                DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
-                    << "Query result listener saved result to queue." << std::endl;
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
+                    << "Query result listener saved result to queue." << std::endl;)
 #endif
                 boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
                 this->m_recordBatches.push(b);
@@ -1267,7 +1265,7 @@ RecordBatch*  DrillClientQueryResult::peekNext(){
     boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
     //if no more data, return NULL;
     if(!m_bIsQueryPending) return NULL;
-    DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
     while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) {
         this->m_cv.wait(cvLock);
     }
@@ -1281,14 +1279,14 @@ RecordBatch*  DrillClientQueryResult::getNext() {
     boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
     //if no more data, return NULL;
     if(!m_bIsQueryPending){
-        DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;)
         if(!m_recordBatches.empty()){
-            DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;)
         }
         return NULL;
     }
 
-    DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
     while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){
         this->m_cv.wait(cvLock);
     }
@@ -1367,7 +1365,7 @@ void DrillClientQueryResult::clearAndDestroy(){
         m_columnDefs->clear();
     }
     if(this->m_pQueryId!=NULL){
-        DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;)
     }
     //Tell the parent to remove this from its lists
     m_pClient->clearMapEntries(this);
@@ -1379,7 +1377,7 @@ void DrillClientQueryResult::clearAndDestroy(){
     if(!m_recordBatches.empty()){
         // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_
         // the last chunk has been received. We eventually delete it.
-        DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;)
         RecordBatch* pR=NULL;
         while(!m_recordBatches.empty()){
             pR=m_recordBatches.front();
@@ -1392,6 +1390,210 @@ void DrillClientQueryResult::clearAndDestroy(){
     }
 }
 
+
+connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
+    connectionStatus_t stat = CONN_SUCCESS;
+    std::string pathToDrill, protocol, hostPortStr;
+    std::string host;
+    std::string port;
+    m_connectStr=connStr;
+    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+    if(!strcmp(protocol.c_str(), "zk")){
+        // Get a list of drillbits
+        ZookeeperImpl zook;
+        std::vector<std::string> drillbits;
+        int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
+        if(!err){
+            Utils::shuffle(drillbits);
+            // The original shuffled order is maintained if we shuffle first and then add any missing elements
+            Utils::add(m_drillbits, drillbits);
+            exec::DrillbitEndpoint e;
+            size_t nextIndex=0;
+            {
+                boost::lock_guard<boost::mutex> cLock(m_cMutex);
+                m_lastConnection++;
+                nextIndex = (m_lastConnection)%(getDrillbitCount());
+            }
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
+                    << "(" << (void*)this << ")"
+                    << ": Current counter is: " 
+                    << m_lastConnection << std::endl;)
+                err=zook.getEndPoint(m_drillbits, nextIndex, e);
+            if(!err){
+                host=boost::lexical_cast<std::string>(e.address());
+                port=boost::lexical_cast<std::string>(e.user_port());
+            }
+        }
+        if(err){
+            return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
+        }
+        zook.close();
+        m_bIsDirectConnection=false;
+    }else if(!strcmp(protocol.c_str(), "local")){
+        char tempStr[MAX_CONNECT_STR+1];
+        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
+        host=strtok(tempStr, ":");
+        port=strtok(NULL, "");
+        m_bIsDirectConnection=true;
+    }else{
+        return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
+    }
+    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;)
+        DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
+    stat =  pDrillClientImpl->connect(host.c_str(), port.c_str());
+    if(stat == CONN_SUCCESS){
+        boost::lock_guard<boost::mutex> lock(m_poolMutex);
+        m_clientConnections.push_back(pDrillClientImpl);
+    }else{
+        DrillClientError* pErr = pDrillClientImpl->getError();
+        handleConnError((connectionStatus_t)pErr->status, pErr->msg);
+        delete pDrillClientImpl;
+    }
+    return stat;
+}
+
+connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
+    // Assume there is one valid connection to at least one drillbit
+    connectionStatus_t stat=CONN_FAILURE;
+    // Keep a copy of the user properties
+    if(props!=NULL){
+        m_pUserProperties = new DrillUserProperties;
+        for(size_t i=0; i<props->size(); i++){
+            m_pUserProperties->setProperty(
+                    props->keyAt(i),
+                    props->valueAt(i)
+                    );
+        }
+    }
+    DrillClientImpl* pDrillClientImpl = getOneConnection();
+    if(pDrillClientImpl != NULL){
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
+        stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
+    }
+    else{
+        stat =  handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
+    }
+    return stat;
+}
+
+DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
+    DrillClientQueryResult* pDrillClientQueryResult = NULL;
+    DrillClientImpl* pDrillClientImpl = NULL;
+    pDrillClientImpl = getOneConnection();
+    if(pDrillClientImpl != NULL){
+        pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
+        m_queriesExecuted++;
+    }
+    return pDrillClientQueryResult;
+}
+
+void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){
+    // Nothing to do. If this class ever keeps track of executing queries then it will need 
+    // to implement this call to free any query specific resources the pool might have 
+    // allocated
+    return;
+}
+
+bool PooledDrillClientImpl::Active(){
+    boost::lock_guard<boost::mutex> lock(m_poolMutex);
+    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+        if((*it)->Active()){
+            return true;
+        }
+    }
+    return false;
+}
+
+void PooledDrillClientImpl::Close() {
+    boost::lock_guard<boost::mutex> lock(m_poolMutex);
+    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+        (*it)->Close();
+        delete *it;
+    }
+    m_clientConnections.clear();
+    if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;}
+    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+    m_lastConnection=-1;
+    m_queriesExecuted=0;
+}
+
+DrillClientError* PooledDrillClientImpl::getError(){
+    std::string errMsg;
+    std::string nl="";
+    uint32_t stat;
+    boost::lock_guard<boost::mutex> lock(m_poolMutex);
+    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+        if((*it)->getError() != NULL){
+            errMsg+=nl+"Query"/*+(*it)->queryId() +*/":"+(*it)->getError()->msg;
+            stat=(*it)->getError()->status;
+        }
+    }
+    if(errMsg.length()>0){
+        if(m_pError!=NULL){ delete m_pError; m_pError=NULL; }
+        m_pError = new DrillClientError(stat, DrillClientError::QRY_ERROR_START+stat, errMsg);
+    }
+    return m_pError;
+}
+
+//Waits as long as any one drillbit connection has results pending
+void PooledDrillClientImpl::waitForResults(){
+    boost::lock_guard<boost::mutex> lock(m_poolMutex);
+    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+        (*it)->waitForResults();
+    }
+    return;
+}
+
+connectionStatus_t PooledDrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){
+    DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
+    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Connection Error: (Pooled) " << pErr->msg << std::endl;)
+    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+    m_pError=pErr;
+    return status;
+}
+
+DrillClientImpl* PooledDrillClientImpl::getOneConnection(){
+    DrillClientImpl* pDrillClientImpl = NULL;
+    while(pDrillClientImpl==NULL){
+        if(m_queriesExecuted == 0){
+            // First query ever sent can use the connection already established to authenticate the user
+            boost::lock_guard<boost::mutex> lock(m_poolMutex);
+            pDrillClientImpl=m_clientConnections[0];// There should be one connection in the list when the first query is executed
+        }else if(m_clientConnections.size() == m_maxConcurrentConnections){
+            // Pool is full. Use one of the already established connections
+            boost::lock_guard<boost::mutex> lock(m_poolMutex);
+            pDrillClientImpl = m_clientConnections[m_queriesExecuted%m_maxConcurrentConnections];
+            if(!pDrillClientImpl->Active()){
+                Utils::eraseRemove(m_clientConnections, pDrillClientImpl);
+                pDrillClientImpl=NULL;
+            }
+        }else{
+            int tries=0;
+            connectionStatus_t ret=CONN_SUCCESS;
+            while(pDrillClientImpl==NULL && tries++ < 3){
+                if((ret=connect(m_connectStr.c_str()))==CONN_SUCCESS){
+                    boost::lock_guard<boost::mutex> lock(m_poolMutex);
+                    pDrillClientImpl=m_clientConnections.back();
+                    ret=pDrillClientImpl->validateHandshake(m_pUserProperties);
+                    if(ret!=CONN_SUCCESS){
+                        delete pDrillClientImpl; pDrillClientImpl=NULL;
+                        m_clientConnections.erase(m_clientConnections.end());
+                    }
+                }
+            } // try a few times
+            if(ret!=CONN_SUCCESS){
+                break;
+            }
+        } // need a new connection 
+    }// while
+
+    if(pDrillClientImpl==NULL){
+        connectionStatus_t status = CONN_NOTCONNECTED;
+        handleConnError(status, getMessage(status));
+    }
+    return pDrillClientImpl;
+}
+
 char ZookeeperImpl::s_drillRoot[]="/drill/";
 char ZookeeperImpl::s_defaultCluster[]="drillbits1";
 
@@ -1427,6 +1629,96 @@ ZooLogLevel ZookeeperImpl::getZkLogLevel(){
     return ZOO_LOG_LEVEL_ERROR;
 }
 
+int ZookeeperImpl::getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits){
+    uint32_t waitTime=30000; // 10 seconds
+    zoo_set_debug_level(getZkLogLevel());
+    zoo_deterministic_conn_order(1); // enable deterministic order
+    struct String_vector* pDrillbits=NULL;
+    m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0);
+    if(!m_zh) {
+        m_err = getMessage(ERR_CONN_ZKFAIL);
+        zookeeper_close(m_zh);
+        return -1;
+    }else{
+        m_err="";
+        //Wait for the completion handler to signal successful connection
+        boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
+        boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime);
+        while(this->m_bConnecting) {
+            if(!this->m_cv.timed_wait(bufferLock, timeout)){
+                m_err = getMessage(ERR_CONN_ZKTIMOUT);
+                zookeeper_close(m_zh);
+                return -1;
+            }
+        }
+    }
+    if(m_state!=ZOO_CONNECTED_STATE){
+        zookeeper_close(m_zh);
+        return -1;
+    }
+    int rc = ZOK;
+    if(pathToDrill==NULL || strlen(pathToDrill)==0){
+        m_rootDir=s_drillRoot;
+        m_rootDir += s_defaultCluster;
+    }else{
+        m_rootDir=pathToDrill;
+    }
+
+    pDrillbits = new String_vector;
+    rc=zoo_get_children(m_zh, m_rootDir.c_str(), 0, pDrillbits);
+    if(rc!=ZOK){
+        delete pDrillbits;
+        m_err=getMessage(ERR_CONN_ZKERR, rc);
+        zookeeper_close(m_zh);
+        return -1;
+    }
+    if(pDrillbits && pDrillbits->count > 0){
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << pDrillbits->count << " drillbits in cluster (" 
+                << connectStr << "/" << pathToDrill
+                << ")." <<std::endl;)
+            for(int i=0; i<pDrillbits->count; i++){
+                drillbits.push_back(pDrillbits->data[i]);
+            }
+        for(int i=0; i<drillbits.size(); i++){
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;)
+        }
+    }
+    delete pDrillbits;
+    return 0;
+}
+
+int ZookeeperImpl::getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint){
+    int rc = ZOK;
+    exec::DrillServiceInstance drillServiceInstance;
+    if( drillbits.size() >0){
+        // pick the drillbit at 'index'
+        const char * bit=drillbits[index].c_str();
+        std::string s;
+        s=m_rootDir +  std::string("/") + bit;
+        int buffer_len=MAX_CONNECT_STR;
+        char buffer[MAX_CONNECT_STR+1];
+        struct Stat stat;
+        buffer[MAX_CONNECT_STR]=0;
+        rc= zoo_get(m_zh, s.c_str(), 0, buffer,  &buffer_len, &stat);
+        if(rc!=ZOK){
+            m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
+            zookeeper_close(m_zh);
+            return -1;
+        }
+        exec::DrillServiceInstance drillServiceInstance;
+        drillServiceInstance.ParseFromArray(buffer, buffer_len);
+        endpoint=drillServiceInstance.endpoint();
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <<index << ">. Selected " << drillServiceInstance.DebugString() << std::endl;)
+    }else{
+
+        m_err=getMessage(ERR_CONN_ZKNODBIT);
+        zookeeper_close(m_zh);
+        return -1;
+    }
+    return 0;
+}
+
+// Deprecated
 int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){
     uint32_t waitTime=30000; // 10 seconds
     zoo_set_debug_level(getZkLogLevel());
@@ -1525,7 +1817,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat
     // signal the cond var
     {
         if (state == ZOO_CONNECTED_STATE){
-            DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;)
         }
         boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex);
         self->m_bConnecting=false;
@@ -1535,7 +1827,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat
 
 void ZookeeperImpl:: debugPrint(){
     if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){
-        DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;)
     }
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index f19a015..06f37e0 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -34,13 +34,18 @@
 #include <queue>
 #include <vector>
 #include <boost/asio.hpp>
-#include <boost/asio/deadline_timer.hpp>
-#include <boost/thread.hpp>
-#ifdef _WIN32
+
+#if defined _WIN32  || defined _WIN64
 #include <zookeeper.h>
+//Windows header files redefine 'random'
+#ifdef random
+#undef random
+#endif
 #else
 #include <zookeeper/zookeeper.h>
 #endif
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/thread.hpp>
 
 #include "drill/drillClient.hpp"
 #include "rpcEncoder.hpp"
@@ -58,12 +63,50 @@ class RecordBatch;
 class RpcEncoder;
 class RpcDecoder;
 
+/*
+ * Defines the interface used by DrillClient and implemented by DrillClientImpl and PooledDrillClientImpl
+ * */
+class DrillClientImplBase{
+    public:
+        DrillClientImplBase(){
+        }
+
+        virtual ~DrillClientImplBase(){
+        }
+
+        //Connect via Zookeeper or directly.
+        //Makes an initial connection to a drillbit. successful connect adds the first drillbit to the pool.
+        virtual connectionStatus_t connect(const char* connStr)=0;
+
+        // Test whether the client is active. Returns true if any one of the underlying connections is active
+        virtual bool Active()=0;
+
+        // Closes all open connections. 
+        virtual void Close()=0;
+
+        // Returns the last error encountered by any of the underlying executing queries or connections
+        virtual DrillClientError* getError()=0;
+
+        // Submits a query to a drillbit. 
+        virtual DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx)=0;
+
+        //Waits as a connection has results pending
+        virtual void waitForResults()=0;
+
+        //Validates handshake at connect time.
+        virtual connectionStatus_t validateHandshake(DrillUserProperties* props)=0;
+
+        virtual void freeQueryResources(DrillClientQueryResult* pQryResult)=0;
+
+};
+
 class DrillClientQueryResult{
     friend class DrillClientImpl;
     public:
-    DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId):
+    DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId, const std::string& query):
         m_pClient(pClient),
         m_coordinationId(coordId),
+        m_query(query),
         m_numBatches(0),
         m_columnDefs(new std::vector<Drill::FieldMetadata*>),
         m_bIsQueryPending(true),
@@ -116,6 +159,7 @@ class DrillClientQueryResult{
     bool isCancelled(){return this->m_bCancel;};
     bool hasSchemaChanged(){return this->m_bHasSchemaChanged;};
     int32_t getCoordinationId(){ return this->m_coordinationId;}
+    const std::string&  getQuery(){ return this->m_query;}
 
     void setQueryId(exec::shared::QueryId* q){this->m_pQueryId=q;}
     void* getListenerContext() {return this->m_pListenerCtx;}
@@ -147,6 +191,8 @@ class DrillClientQueryResult{
     DrillClientImpl* m_pClient;
 
     int32_t m_coordinationId;
+    const std::string& m_query;
+
     size_t m_numBatches; // number of record batches received so far
 
     // Vector of Buffers holding data returned by the server
@@ -189,7 +235,7 @@ class DrillClientQueryResult{
     void * m_pListenerCtx;
 };
 
-class DrillClientImpl{
+class DrillClientImpl : public DrillClientImplBase{
     public:
         DrillClientImpl():
             m_coordinationId(1),
@@ -256,9 +302,14 @@ class DrillClientImpl{
         DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
         void waitForResults();
         connectionStatus_t validateHandshake(DrillUserProperties* props);
+        void freeQueryResources(DrillClientQueryResult* pQryResult){
+            // Doesn't need to do anything
+            return;
+        };
 
     private:
         friend class DrillClientQueryResult;
+        friend class PooledDrillClientImpl;
 
         struct compareQueryId{
             bool operator()(const exec::shared::QueryId* q1, const exec::shared::QueryId* q2) const {
@@ -275,7 +326,6 @@ class DrillClientImpl{
         void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out.
 
         int32_t getNextCoordinationId(){ return ++m_coordinationId; };
-        void parseConnectStr(const char* connectStr, std::string& pathToDrill, std::string& protocol, std::string& hostPortStr);
         // send synchronous messages
         //connectionStatus_t recvSync(InBoundRpcMessage& msg);
         connectionStatus_t sendSync(OutBoundRpcMessage& msg);
@@ -331,6 +381,9 @@ class DrillClientImpl{
         std::string m_handshakeErrorMsg;
         bool m_bIsConnected;
 
+        std::string m_connectStr; 
+
+        // 
         // number of outstanding read requests.
         // handleRead will keep asking for more results as long as this number is not zero.
         size_t m_pendingRequests;
@@ -356,6 +409,8 @@ class DrillClientImpl{
         boost::asio::deadline_timer m_deadlineTimer; // to timeout async queries that never return
         boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat messages
 
+        std::string m_connectedHost; // The hostname and port the socket is connected to.
+
         //for synchronous messages, like validate handshake
         ByteBuf_t m_rbuf; // buffer for receiving synchronous messages
         DataBuf m_wbuf; // buffer for sending synchronous message
@@ -372,12 +427,106 @@ class DrillClientImpl{
         // Condition variable to signal completion of all queries. 
         boost::condition_variable m_cv;
 
+        bool m_bIsDirectConnection;
 };
 
 inline bool DrillClientImpl::Active() {
     return this->m_bIsConnected;;
 }
 
+
+/* *
+ *  Provides the same public interface as a DrillClientImpl but holds a pool of DrillClientImpls.
+ *  Every submitQuery uses a different DrillClientImpl to distribute the load.
+ *  DrillClient can use this class instead of DrillClientImpl to get better load balancing.
+ * */
+class PooledDrillClientImpl : public DrillClientImplBase{
+    public:
+        PooledDrillClientImpl(){
+            m_bIsDirectConnection=false;
+            m_maxConcurrentConnections = DEFAULT_MAX_CONCURRENT_CONNECTIONS;
+            char* maxConn=std::getenv(MAX_CONCURRENT_CONNECTIONS_ENV);
+            if(maxConn!=NULL){
+                m_maxConcurrentConnections=atoi(maxConn);
+            }
+            m_lastConnection=-1;
+            m_pError=NULL;
+            m_queriesExecuted=0;
+            m_pUserProperties=NULL;
+        }
+
+        ~PooledDrillClientImpl(){
+            for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+                delete *it;
+            }
+            m_clientConnections.clear();
+            if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;}
+            if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+        }
+
+        //Connect via Zookeeper or directly.
+        //Makes an initial connection to a drillbit. successful connect adds the first drillbit to the pool.
+        connectionStatus_t connect(const char* connStr);
+
+        // Test whether the client is active. Returns true if any one of the underlying connections is active
+        bool Active();
+
+        // Closes all open connections. 
+        void Close() ;
+
+        // Returns the last error encountered by any of the underlying executing queries or connections
+        DrillClientError* getError();
+
+        // Submits a query to a drillbit. If more than one query is to be sent, we may choose a
+        // a different drillbit in the pool. No more than m_maxConcurrentConnections will be allowed.
+        // Connections once added to the pool will be removed only when the DrillClient is closed.
+        DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
+
+        //Waits as long as any one drillbit connection has results pending
+        void waitForResults();
+
+        //Validates handshake only against the first drillbit connected to.
+        connectionStatus_t validateHandshake(DrillUserProperties* props);
+
+        void freeQueryResources(DrillClientQueryResult* pQryResult);
+
+        int getDrillbitCount(){ return m_drillbits.size();};
+
+    private:
+        
+        std::string m_connectStr; 
+        std::string m_lastQuery;
+        
+        // A list of all the current client connections. We choose a new one for every query. 
+        // When picking a drillClientImpl to use, we see how many queries each drillClientImpl
+        // is currently executing. If none,  
+        std::vector<DrillClientImpl*> m_clientConnections; 
+		boost::mutex m_poolMutex; // protect access to the vector
+        
+        //ZookeeperImpl zook;
+        
+        // Use this to decide which drillbit to select next from the list of drillbits.
+        size_t m_lastConnection;
+		boost::mutex m_cMutex;
+
+        // Number of queries executed so far. Can be used to select a new Drillbit from the pool.
+        size_t m_queriesExecuted;
+
+        size_t m_maxConcurrentConnections;
+
+        bool m_bIsDirectConnection;
+
+        DrillClientError* m_pError;
+
+        connectionStatus_t handleConnError(connectionStatus_t status, std::string msg);
+        // get a connection from the pool or create a new one. Return NULL if none is found
+        DrillClientImpl* getOneConnection();
+
+        std::vector<std::string> m_drillbits;
+
+        DrillUserProperties* m_pUserProperties;//Keep a copy of user properties
+};
+
 class ZookeeperImpl{
     public:
         ZookeeperImpl();
@@ -385,12 +534,17 @@ class ZookeeperImpl{
         static ZooLogLevel getZkLogLevel();
         // comma separated host:port pairs, each corresponding to a zk
         // server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
-        int connectToZookeeper(const char* connectStr, const char* pathToDrill);
+        DEPRECATED int connectToZookeeper(const char* connectStr, const char* pathToDrill);
         void close();
         static void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context);
         void debugPrint();
         std::string& getError(){return m_err;}
         const exec::DrillbitEndpoint& getEndPoint(){ return m_drillServiceInstance.endpoint();}
+        // return unshuffled list of drillbits
+        int getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits);
+        // picks the index drillbit and returns the corresponding endpoint object
+        int getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint);
+        
 
     private:
         static char s_drillRoot[];
@@ -407,6 +561,7 @@ class ZookeeperImpl{
         boost::condition_variable m_cv;
         bool m_bConnecting;
         exec::DrillServiceInstance m_drillServiceInstance;
+        std::string m_rootDir;
 };
 
 } // namespace Drill

http://git-wip-us.apache.org/repos/asf/drill/blob/df0f0af3/contrib/native/client/src/clientlib/env.h.in
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/env.h.in b/contrib/native/client/src/clientlib/env.h.in
new file mode 100644
index 0000000..a32f152
--- /dev/null
+++ b/contrib/native/client/src/clientlib/env.h.in
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ENV_H
+#define ENV_H
+
+#define GIT_COMMIT_PROP @GIT_COMMIT_PROP@
+
+#endif
+
+


Mime
View raw message