drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [10/13] drill git commit: DRILL-5431: SSL Support (C++) - Update DrillClientImpl to use Channel implementation
Date Thu, 12 Oct 2017 18:28:22 GMT
DRILL-5431: SSL Support (C++) - Update DrillClientImpl to use Channel implementation

Also remove ChannelContextFactory and merge it into ChannelFactory


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f246c3ca
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f246c3ca
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f246c3ca

Branch: refs/heads/master
Commit: f246c3cad7f44baeb8153913052ebc963c62276a
Parents: facbb92
Author: Parth Chandra <parthc@apache.org>
Authored: Mon Jul 24 12:55:02 2017 -0700
Committer: Parth Chandra <parthc@apache.org>
Committed: Wed Oct 11 19:27:48 2017 -0700

----------------------------------------------------------------------
 contrib/native/client/CMakeLists.txt            |   3 +
 .../native/client/example/querySubmitter.cpp    |  33 ++-
 contrib/native/client/readme.linux              |  13 +
 contrib/native/client/readme.macos              |  33 ++-
 contrib/native/client/readme.sasl               |  93 +++++++
 contrib/native/client/readme.ssl                |   4 +-
 contrib/native/client/readme.win.txt            |  14 +-
 .../native/client/src/clientlib/CMakeLists.txt  |   2 +-
 contrib/native/client/src/clientlib/channel.cpp | 146 ++++------
 contrib/native/client/src/clientlib/channel.hpp |  58 ++--
 .../client/src/clientlib/drillClientImpl.cpp    | 270 ++++++++++---------
 .../client/src/clientlib/drillClientImpl.hpp    |  38 ++-
 .../native/client/src/clientlib/drillConfig.cpp |   2 +-
 .../native/client/src/include/drill/common.hpp  |   7 +-
 .../client/src/include/drill/drillConfig.hpp    |  22 +-
 .../client/src/include/drill/userProperties.hpp |   9 +-
 contrib/native/client/test/ssl/testSSL.cpp      |   7 +-
 pom.xml                                         |   3 +-
 18 files changed, 430 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/CMakeLists.txt b/contrib/native/client/CMakeLists.txt
index 0c104ab..3f6c44b 100644
--- a/contrib/native/client/CMakeLists.txt
+++ b/contrib/native/client/CMakeLists.txt
@@ -126,6 +126,9 @@ endif()
 find_package(Protobuf REQUIRED )
 include_directories(${PROTOBUF_INCLUDE_DIR})
 
+if (MSVC)
+    set(OPENSSL_USE_STATIC_LIBS TRUE)
+endif()
 #Find SSL
 find_package(OpenSSL REQUIRED )
 if(OPENSSL_FOUND)

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 47e55de..43b909d 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -25,7 +24,7 @@
 #include <boost/algorithm/string/join.hpp>
 #include "drill/drillc.hpp"
 
-int nOptions=19;
+int nOptions=25;
 
 struct Option{
     char name[32];
@@ -50,7 +49,14 @@ struct Option{
     {"service_host", "Service host for Kerberos", false},
     {"service_name", "Service name for Kerberos", false},
     {"auth", "Authentication mechanism to use", false},
-    {"sasl_encrypt", "Negotiate for encrypted connection", false}
+    {"sasl_encrypt", "Negotiate for encrypted connection", false},
+    {"enableSSL", "Enable SSL", false},
+    {"TLSProtocol", "TLS protocol version", false},
+    {"certFilePath", "Path to SSL certificate file", false},
+    {"disableHostnameVerification", "disable host name verification", false},
+    {"disableCertVerification", "disable certificate verification", false},
+    {"useSystemTrustStore", "[Windows only]. Use the system truststore.", false }
+
 };
 
 std::map<std::string, std::string> qsOptionValues;
@@ -304,6 +310,12 @@ int main(int argc, char* argv[]) {
         std::string serviceHost=qsOptionValues["service_host"];
         std::string serviceName=qsOptionValues["service_name"];
         std::string auth=qsOptionValues["auth"];
+        std::string enableSSL=qsOptionValues["enableSSL"];
+        std::string tlsProtocol=qsOptionValues["TLSProtocol"];
+        std::string certFilePath=qsOptionValues["certFilePath"];
+        std::string disableHostnameVerification=qsOptionValues["disableHostnameVerification"];
+        std::string disableCertVerification=qsOptionValues["disableCertVerification"];
+        std::string useSystemTrustStore = qsOptionValues["useSystemTrustStore"];
 
         Drill::QueryType type;
 
@@ -392,6 +404,20 @@ int main(int argc, char* argv[]) {
         if(auth.length()>0){
             props.setProperty(USERPROP_AUTH_MECHANISM, auth);
         }
+        if(enableSSL.length()>0){
+            props.setProperty(USERPROP_USESSL, enableSSL);
+			if (enableSSL == "true" && certFilePath.length() <= 0 && useSystemTrustStore.length() <= 0){
+                std::cerr<< "SSL is enabled but no certificate or truststore provided. " << std::endl;
+                return -1;
+            }
+            props.setProperty(USERPROP_TLSPROTOCOL, tlsProtocol);
+            props.setProperty(USERPROP_CERTFILEPATH, certFilePath);
+            props.setProperty(USERPROP_DISABLE_HOSTVERIFICATION, disableHostnameVerification);
+            props.setProperty(USERPROP_DISABLE_CERTVERIFICATION, disableCertVerification);
+			if (useSystemTrustStore.length() > 0){
+				props.setProperty(USERPROP_USESYSTEMTRUSTSTORE, useSystemTrustStore);
+			}
+        }
 
         if(client.connect(connectStr.c_str(), &props)!=Drill::CONN_SUCCESS){
             std::cerr<< "Failed to connect with error: "<< client.getError() << " (Using:"<<connectStr<<")"<<std::endl;
@@ -548,3 +574,4 @@ int main(int argc, char* argv[]) {
 
     return 0;
 }
+

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/readme.linux
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.linux b/contrib/native/client/readme.linux
index 4eaeea5..34c791b 100644
--- a/contrib/native/client/readme.linux
+++ b/contrib/native/client/readme.linux
@@ -84,6 +84,19 @@ OR
     ln -svf libboost_filesystem.a libboost_filesystem-mt.a
     ln -svf libboost_date_time.a libboost_date_time-mt.a
 
+5) Install or  build Cyrus SASL 
+   To Install 
+       yum install cyrus-sasl-devel cyrus-sasl-gssapi
+   libs are installed in /usr/lib64/sasl2
+   includes are installed in /usr/include
+
+   To build your own 
+   See readme.sasl for instructions
+
+6) Install OpenSSL
+   yum install openssl-devel openssl
+
+
 (Optional) Refresh protobuf source files
 ----------------------------------------
 When changes have been introduced to the protocol module, you might need to refresh the protobuf C++ source files too.

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/readme.macos
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.macos b/contrib/native/client/readme.macos
index eee017e..e9be712 100644
--- a/contrib/native/client/readme.macos
+++ b/contrib/native/client/readme.macos
@@ -44,12 +44,23 @@ Install Prerequisites
 2.2) Install zookeeper
   $> brew install zookeeper
 
-2.3) Install boost
+2.3) Install or build Cyrus SASL 
+   To Install (need macports, brew did not work for me)
+       port install cyrus-sasl2
+
+   To build your own 
+   See readme.sasl for instructions
+
+2.4) Install OpenSSL
+   [NOTE: MacOS has a neutered version of openssl installed. Install the build using brew. Make sure the brew installation is picked up]
+   brew install openssl
+
+2.5) Install boost
   $> brew install boost
 
-2.3.1) For production builds, see the readme.boost file  
+2.5.1) For production builds, see the readme.boost file  
   
-2.3.1.1 Build using XCODE
+2.6) Build using XCODE
 =========================  
 (Optional) Refresh protobuf source files
 ----------------------------------------
@@ -66,7 +77,7 @@ Build drill client
 -------------------
   $> cd DRILL_DIR/contrib/native/client
   $> mkdir build
-  $> cd build && cmake -G "Xcode" -D CMAKE_BUILD_TYPE=Debug ..
+  $> cd build && cmake -G "Xcode" -DOPENSSL_ROOT_DIR="/usr/local/opt/openssl "-D CMAKE_BUILD_TYPE=Debug ..
   $> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target ALL_BUILD
 
 
@@ -74,7 +85,7 @@ XCode IDE
 ---------
   You can open the drillclient.xcodeproj file in the XCode ide and run/debug as with any other command line app
 
-2.3.1.2 Build using MAKE
+2.7) Build using MAKE
 ========================
 (Optional) Refresh protobuf source files
 ----------------------------------------
@@ -90,16 +101,22 @@ Build drill client
 -------------------
     $> cd DRILL_DIR/contrib/native/client
     $> mkdir build
-    $> cd build && cmake3 -G "Unix Makefiles" -D CMAKE_BUILD_TYPE=Debug ..
+    $> cd build && cmake3 -G "Unix Makefiles" -DOPENSSL_ROOT_DIR="/usr/local/opt/openssl" -D CMAKE_BUILD_TYPE=Debug ..
     $> make
 
+2.10) Build using CLion
+CLion can recognize cmake projects automatically. Check CLion documentation for help on how to use CMake with CLion.
+To prevent CLion's cmake from picking up the system installed OpenSSL set the following define in CLion/Preferences/Build, Execution, Deployment/CMake/CMake Options
+-DOPENSSL_ROOT_DIR="/usr/local/opt/openssl
+Then reload the CMake project making sure to invalidate the CMake cache
+Tools/CMake/Reset Cache and Reload Project
 
-2.4 Test
+2.9 Test
 --------
 Run query submitter from the command line
   $> querySubmitter query='select * from dfs.`/Users/pchandra/work/data/tpc-h/customer.parquet`' type=sql connectStr=local=10.250.0.146:31010 api=async logLevel=trace user=yourUserName password=yourPassWord
 
-2.5 Valgrind
+2.10 Valgrind
 ------------
   Install valgrind using brew
   $> brew install valgrind

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/readme.sasl
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.sasl b/contrib/native/client/readme.sasl
new file mode 100644
index 0000000..3eab471
--- /dev/null
+++ b/contrib/native/client/readme.sasl
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+## On Mac OS and Linux
+
+* Download cyrus-sasl tarball or clone git sources.
+
+You can get the latest tarball from [here](ftp://ftp.cyrusimap.org/cyrus-sasl/).
+
+Note: If you download the source tarball, the configure script is already present. If you have cloned the source from git. It will be missing the .configure script and that needs to be generated. The steps to generate it are not clear. You can overcome this issue by just copying the .configure script form the tarball onto the folder containing the git sources.
+
+* Run configure
+
+`./configure --disable-cram --disable-digest --disable-scram --disable-login --disable-otp --disable-passdss --disable-krb4 --disable-srp --without-des CFLAGS="-g"`
+
+* Build the sources and install
+    
+    `make`
+
+    `make install`
+
+(On Mac) - The SASL library should now be available in _/usr/local/lib/sasl2_
+
+    -rwxr-xr-x  1 root  admin    659 Dec 23 15:24 libanonymous.la
+    -rwxr-xr-x  1 root  admin  22276 Dec 23 15:24 libanonymous.plugin
+    -rwxr-xr-x  1 root  admin    706 Dec 23 15:24 libgssapiv2.la
+    -rwxr-xr-x  1 root  admin  37052 Dec 23 15:24 libgssapiv2.plugin
+    -rwxr-xr-x  1 root  admin    643 Dec 23 15:24 libplain.la
+    -rwxr-xr-x  1 root  admin  22028 Dec 23 15:24 libplain.plugin
+    -rwxr-xr-x  1 root  admin    665 Dec 23 15:24 libsasldb.la
+    -rwxr-xr-x  1 root  admin  33916 Dec 23 15:24 libsasldb.plugin
+
+## (On CentOS Linux) - 
+
+* You need to download the "krb5-devel" rpm using command below. This will download `gssapi` directory and `gssapi.h` file under `/usr/include`.      
+`yum install krb5-devel`
+
+* Copy the `gssapi` directory and `gssapi.h file` into the plugin directory found under cyrus-sasl untar directory.
+Example: 
+    * `cp -r /usr/include/gssapi ~/cyrusSasl/cyrus-sasl-2.1.26/plugins`
+    * `cp /usr/include/gssapi.h ~/cyrusSasl/cyrus-sasl-2.1.26/plugins`
+
+* Remove the config cache file if any in cyrus-sasl directory
+   * `rm -f ~/cyrusSasl/cyrus-sasl-2.1.26/config.cache`
+
+* Run configure
+
+`./configure --disable-cram --disable-digest --disable-scram --disable-login --disable-otp --disable-passdss --disable-krb4 --disable-srp --without-des CFLAGS="-g"`
+
+* Build the sources and install
+  * `make clean`
+  * `make`
+  * `make install`
+
+(On CentOS Linux) The SASL library should now be available in _/usr/local/lib/sasl2_
+
+
+     -rwxr-xr-x  1 root root   684 Mar 15 18:12 libanonymous.la     
+     -rwxr-xr-x  1 root root 53751 Mar 15 18:12 libanonymous.so.3.0.0      
+     -rwxr-xr-x  1 root root   704 Mar 15 18:12 libgs2.la       
+     -rwxr-xr-x  1 root root 81808 Mar 15 18:12 libgs2.so.3.0.0      
+     -rwxr-xr-x  1 root root   734 Mar 15 18:12 libgssapiv2.la       
+     -rwxr-xr-x  1 root root 83549 Mar 15 18:12 libgssapiv2.so.3.0.0       
+     -rwxr-xr-x  1 root root   668 Mar 15 18:12 libplain.la       
+     -rwxr-xr-x  1 root root 54515 Mar 15 18:12 libplain.so.3.0.0       
+     -rwxr-xr-x  1 root root   684 Mar 15 18:12 libsasldb.la        
+     -rwxr-xr-x  1 root root 98219 Mar 15 18:12 libsasldb.so.3.0.0        
+
+## On Win 64
+
+* To build the base library and the plain plugin
+
+Change the _PLUGINS_ variable in _plugins/NTMakefile_ to include only the PLAIN plugin. Then run `nmake /f NTMakefile CFG=Release`
+
+The build will fail in sasldb but it would have already built the base library and PLAIN.
+
+Use `nmake /f NTMakefile CFG=Debug` to get a debug build.Building Cyrus SASL
+

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/readme.ssl
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.ssl b/contrib/native/client/readme.ssl
index 86d46fc..8c875cf 100644
--- a/contrib/native/client/readme.ssl
+++ b/contrib/native/client/readme.ssl
@@ -21,7 +21,9 @@ Installing OpenSSL -
         brew install openssl
     On Linux :
 
-Set up the certificate
+Set up the certificate for testing. The files generated by this set of steps are used by the boost example programs.
+These are also the steps used to generate the test certificates used by the Drillbit unit tests.
+
     Generate a private key
 
         openssl genrsa -des3 -out drillTestServerKey.pem 1024

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/readme.win.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/readme.win.txt b/contrib/native/client/readme.win.txt
index 9391038..bd84b91 100644
--- a/contrib/native/client/readme.win.txt
+++ b/contrib/native/client/readme.win.txt
@@ -121,7 +121,6 @@ Windows platforms should be more or less similar.
 2.2 Protobuf (2.5.0)
     Get protobuf from here: https://protobuf.googlecode.com/files/protobuf-2.5.0.zip
 
-
     a) Protobuf builds static libraries
     b) In Visual Studio, open <PROTOBUF_HOME>/vsprojects/protobuf.sln. The IDE may
     update the solution file. This should go thru successfully.
@@ -152,6 +151,15 @@ Windows platforms should be more or less similar.
     c) InVisual Studio 2010 Express open <CPPUNIT_HOME>/src/CppUnitLibraries2010.sln
        i) Build cppunit project
 
+2.5 Install or build Cyrus SASL
+   To build your own see readme.sasl for instructions
+
+2.6 Install OpenSSL
+   Download from https://slproweb.com/products/Win32OpenSSL.html
+   At the time of writing the compatible version is Win32OpenSSL-1_0_2L
+   OpenSSL is installed into C:\OpenSSL-Win64, If you install DLL's into bin directory, make sure the directory is added to the PATH
+
+
 3 Building Drill Clientlib
 3.1 SET the following environment variables
     set BOOST_LIBRARYDIR=<BOOST_HOME>\BUILD_TYPE
@@ -163,10 +171,10 @@ Windows platforms should be more or less similar.
     C:> cd build
 
     a) For the 32 bit build :
-        C:> cmake -G "Visual Studio 10" -D ZOOKEEPER_HOME=<ZOOKEPER_HOME> -D PROTOBUF_SRC_ROOT_FOLDER=<PROTOBUF_HOME> -D CPPUNIT_HOME=<CPPUNIT_HOME> -D CMAKE_BUILD_TYPE=Debug   ..
+        C:> cmake -G "Visual Studio 10" -D ZOOKEEPER_HOME=<ZOOKEPER_HOME> -D PROTOBUF_SRC_ROOT_FOLDER=<PROTOBUF_HOME> -D CPPUNIT_HOME=<CPPUNIT_HOME> -D SASL_LIBRARY="<SASL_HOME>\lib64_debug\libsasl.lib" -D SASL_HOME=<SASL_HOME> -D OPENSSL_ROOT=<OPENSSL_HOME> -D CMAKE_BUILD_TYPE=Debug   ..
 
     b) For the 64 bit build :
-        C:> cmake -G "Visual Studio 10 Win64 " -D ZOOKEEPER_HOME=<ZOOKEPER_HOME> -D PROTOBUF_SRC_ROOT_FOLDER=<PROTOBUF_HOME> -D CPPUNIT_HOME=<CPPUNIT_HOME> -D CMAKE_BUILD_TYPE=Debug   ..
+        C:> cmake -G "Visual Studio 10 Win64 " -D ZOOKEEPER_HOME=<ZOOKEPER_HOME> -D PROTOBUF_SRC_ROOT_FOLDER=<PROTOBUF_HOME> -D CPPUNIT_HOME=<CPPUNIT_HOME> -D SASL_LIBRARY="<SASL_HOME>\lib64_debug\libsasl.lib" -D SASL_HOME=<SASL_HOME> -D OPENSSL_ROOT=<OPENSSL_HOME> -D CMAKE_BUILD_TYPE=Debug   ..
 
 3.3 Open the generated <DRILL_HOME>/contrib/native/client/build/drillclient.sln 
     file in Visual Studio.

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/src/clientlib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/CMakeLists.txt b/contrib/native/client/src/clientlib/CMakeLists.txt
index 2270c91..7b9ecc3 100644
--- a/contrib/native/client/src/clientlib/CMakeLists.txt
+++ b/contrib/native/client/src/clientlib/CMakeLists.txt
@@ -40,7 +40,7 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../i
 include_directories(${PROTOBUF_INCLUDE_DIR})
 include_directories(${Zookeeper_INCLUDE_DIRS})
 include_directories(${SASL_INCLUDE_DIRS})
-include_directories("${OPENSSL_INCLUDE_DIR}")
+include_directories(${OPENSSL_INCLUDE_DIR})
 
 link_directories(/usr/local/lib)
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/src/clientlib/channel.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/channel.cpp b/contrib/native/client/src/clientlib/channel.cpp
index 62ce976..84f3eb4 100644
--- a/contrib/native/client/src/clientlib/channel.cpp
+++ b/contrib/native/client/src/clientlib/channel.cpp
@@ -39,7 +39,7 @@ ConnectionEndpoint::ConnectionEndpoint(const char* connStr){
 ConnectionEndpoint::ConnectionEndpoint(const char* host, const char* port){
     m_host=host;
     m_port=port;
-    m_protocol="drillbit"; // direct connection
+    m_protocol=PROTOCOL_TYPE_DIRECT; // direct connection
     m_pError=NULL;
 }
 
@@ -61,7 +61,7 @@ connectionStatus_t ConnectionEndpoint::getDrillbitEndpoint(){
                 DRILL_LOG(LOG_INFO) << "Failed to get endpoint from zk" << std::endl;
                 return ret;
             }
-        }else if(!this->isDirectConnection()){
+        }else if(!isDirectConnection()){
             return handleError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, this->getProtocol().c_str()));
         }
     }else{
@@ -77,19 +77,19 @@ void ConnectionEndpoint::parseConnectString(){
     boost::cmatch matched;
 
     if(boost::regex_match(m_connectString.c_str(), matched, connStrExpr)){
-        m_protocol.assign(matched[1].first, matched[1].second);
+        m_protocol = matched[1].str();
         if(isDirectConnection()){
-            m_host.assign(matched[4].first, matched[4].second);
-            m_port.assign(matched[5].first, matched[5].second);
+            m_host = matched[4].str();
+            m_port = matched[5].str();
         }else {
             // if the connection is to a zookeeper,
             // we will get the host and the port only after connecting to the Zookeeper
             m_host = "";
             m_port = "";
         }
-        m_hostPortStr.assign(matched[2].first, matched[2].second);
+        m_hostPortStr = matched[2].str();
         if(matched[6].matched) {
-            m_pathToDrill.assign(matched[6].first, matched[6].second);
+            m_pathToDrill = matched[6].str();
         }
         DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)
                              << "Conn str: "<< m_connectString
@@ -106,12 +106,12 @@ void ConnectionEndpoint::parseConnectString(){
 
 bool ConnectionEndpoint::isDirectConnection(){
     assert(!m_protocol.empty());
-    return (!strcmp(m_protocol.c_str(), "local") || !strcmp(m_protocol.c_str(), "drillbit"));
+    return ( m_protocol == PROTOCOL_TYPE_DIRECT || m_protocol == PROTOCOL_TYPE_DIRECT_2 );
 }
 
 bool ConnectionEndpoint::isZookeeperConnection(){
     assert(!m_protocol.empty());
-    return (!strcmp(m_protocol.c_str(), "zk"));
+    return (m_protocol == PROTOCOL_TYPE_ZK);
 }
 
 connectionStatus_t ConnectionEndpoint::getDrillbitEndpointFromZk(){
@@ -148,138 +148,86 @@ connectionStatus_t ConnectionEndpoint::handleError(connectionStatus_t status, st
     return status;
 }
 
-/****************************
- * Channel Context Factory
- ****************************/
-ChannelContext* ChannelContextFactory::getChannelContext(channelType_t t, DrillUserProperties* props){
-    ChannelContext* pChannelContext=NULL;
-    switch(t){
-        case CHANNEL_TYPE_SOCKET:
-            pChannelContext=new ChannelContext(props);
-            break;
-#if defined(IS_SSL_ENABLED)
-        case CHANNEL_TYPE_SSLSTREAM: {
-
-            std::string protocol;
-            props->getProp(USERPROP_TLSPROTOCOL, protocol);
-            boost::asio::ssl::context::method tlsVersion = SSLChannelContext::getTlsVersion(protocol);
-
-            std::string noVerifyCert;
-            props->getProp(USERPROP_DISABLE_CERTVERIFICATION, noVerifyCert);
-            boost::asio::ssl::context::verify_mode verifyMode = boost::asio::ssl::context::verify_peer;
-            if (noVerifyCert == "true") {
-                verifyMode = boost::asio::ssl::context::verify_none;
-            }
-
-            pChannelContext = new SSLChannelContext(props, tlsVersion, verifyMode);
-        }
-            break;
-#endif
-        default:
-            DRILL_LOG(LOG_ERROR) << "Channel type " << t << " is not supported." << std::endl;
-            break;
-    }
-    return pChannelContext;
-} 
-
 /*******************
  *  ChannelFactory
  * *****************/
-Channel* ChannelFactory::getChannel(channelType_t t, const char* connStr){
+Channel* ChannelFactory::getChannel(channelType_t t, boost::asio::io_service& ioService, const char* connStr, DrillUserProperties* props){
     Channel* pChannel=NULL;
+    ChannelContext_t * pChannelContext = ChannelFactory::getChannelContext(t, props);
     switch(t){
         case CHANNEL_TYPE_SOCKET:
-            pChannel=new SocketChannel(connStr);
+            pChannel=new SocketChannel(ioService, connStr);
             break;
 #if defined(IS_SSL_ENABLED)
         case CHANNEL_TYPE_SSLSTREAM:
-            pChannel=new SSLStreamChannel(connStr);
+            pChannel=new SSLStreamChannel(ioService, connStr);
             break;
 #endif
         default:
             DRILL_LOG(LOG_ERROR) << "Channel type " << t << " is not supported." << std::endl;
             break;
     }
+    pChannel->m_pContext = pChannelContext;
     return pChannel;
 }
 
-Channel* ChannelFactory::getChannel(channelType_t t, const char* host, const char* port){
+Channel* ChannelFactory::getChannel(channelType_t t, boost::asio::io_service& ioService, const char* host, const char* port, DrillUserProperties* props){
     Channel* pChannel=NULL;
+    ChannelContext_t * pChannelContext = ChannelFactory::getChannelContext(t, props);
     switch(t){
         case CHANNEL_TYPE_SOCKET:
-            pChannel=new SocketChannel(host, port);
+            pChannel=new SocketChannel(ioService, host, port);
             break;
 #if defined(IS_SSL_ENABLED)
         case CHANNEL_TYPE_SSLSTREAM:
-            pChannel=new SSLStreamChannel(host, port);
+            pChannel=new SSLStreamChannel(ioService, host, port);
             break;
 #endif
         default:
             DRILL_LOG(LOG_ERROR) << "Channel type " << t << " is not supported." << std::endl;
             break;
     }
+    pChannel->m_pContext = pChannelContext;
     return pChannel;
 }
 
-Channel* ChannelFactory::getChannel(channelType_t t, boost::asio::io_service& ioService, const char* connStr){
-    Channel* pChannel=NULL;
+ChannelContext* ChannelFactory::getChannelContext(channelType_t t, DrillUserProperties* props){
+    ChannelContext* pChannelContext=NULL;
     switch(t){
         case CHANNEL_TYPE_SOCKET:
-            pChannel=new SocketChannel(ioService, connStr);
+            pChannelContext=new ChannelContext(props);
             break;
 #if defined(IS_SSL_ENABLED)
-        case CHANNEL_TYPE_SSLSTREAM:
-            pChannel=new SSLStreamChannel(ioService, connStr);
-            break;
-#endif
-        default:
-            DRILL_LOG(LOG_ERROR) << "Channel type " << t << " is not supported." << std::endl;
-            break;
-    }
-    return pChannel;
-}
+        case CHANNEL_TYPE_SSLSTREAM: {
 
-Channel* ChannelFactory::getChannel(channelType_t t, boost::asio::io_service& ioService, const char* host, const char* port){
-    Channel* pChannel=NULL;
-    switch(t){
-        case CHANNEL_TYPE_SOCKET:
-            pChannel=new SocketChannel(ioService, host, port);
-            break;
-#if defined(IS_SSL_ENABLED)
-        case CHANNEL_TYPE_SSLSTREAM:
-            pChannel=new SSLStreamChannel(ioService, host, port);
+            std::string protocol;
+            props->getProp(USERPROP_TLSPROTOCOL, protocol);
+            boost::asio::ssl::context::method tlsVersion = SSLChannelContext::getTlsVersion(protocol);
+
+            std::string noVerifyCert;
+            props->getProp(USERPROP_DISABLE_CERTVERIFICATION, noVerifyCert);
+            boost::asio::ssl::context::verify_mode verifyMode = boost::asio::ssl::context::verify_peer;
+            if (noVerifyCert == "true") {
+                verifyMode = boost::asio::ssl::context::verify_none;
+            }
+
+            pChannelContext = new SSLChannelContext(props, tlsVersion, verifyMode);
+        }
             break;
 #endif
         default:
             DRILL_LOG(LOG_ERROR) << "Channel type " << t << " is not supported." << std::endl;
             break;
     }
-    return pChannel;
+    return pChannelContext;
 }
 
 /*******************
  *  Channel
  * *****************/
 
-Channel::Channel(const char* connStr) : m_ioService(m_ioServiceFallback){
-    m_pEndpoint=new ConnectionEndpoint(connStr);
-    m_ownIoService = true;
-    m_pSocket=NULL;
-    m_state=CHANNEL_UNINITIALIZED;
-    m_pError=NULL;
-}
-
-Channel::Channel(const char* host, const char* port) : m_ioService(m_ioServiceFallback){
-    m_pEndpoint=new ConnectionEndpoint(host, port);
-    m_ownIoService = true;
-    m_pSocket=NULL;
-    m_state=CHANNEL_UNINITIALIZED;
-    m_pError=NULL;
-}
-
 Channel::Channel(boost::asio::io_service& ioService, const char* connStr):m_ioService(ioService){
     m_pEndpoint=new ConnectionEndpoint(connStr);
-    m_ownIoService = false;
     m_pSocket=NULL;
     m_state=CHANNEL_UNINITIALIZED;
     m_pError=NULL;
@@ -287,7 +235,6 @@ Channel::Channel(boost::asio::io_service& ioService, const char* connStr):m_ioSe
 
 Channel::Channel(boost::asio::io_service& ioService, const char* host, const char* port) : m_ioService(ioService){
     m_pEndpoint=new ConnectionEndpoint(host, port);
-    m_ownIoService = true;
     m_pSocket=NULL;
     m_state=CHANNEL_UNINITIALIZED;
     m_pError=NULL;
@@ -311,10 +258,9 @@ template <typename SettableSocketOption> void Channel::setOption(SettableSocketO
     assert(0); 
 }
 
-connectionStatus_t Channel::init(ChannelContext_t* pContext){
+connectionStatus_t Channel::init(){
     connectionStatus_t ret=CONN_SUCCESS;
     this->m_state=CHANNEL_INITIALIZED;
-    this->m_pContext = pContext;
     return ret;
 }
 
@@ -389,11 +335,11 @@ connectionStatus_t Channel::connectInternal() {
 
 }
 
-connectionStatus_t SocketChannel::init(ChannelContext_t* pContext){
+connectionStatus_t SocketChannel::init(){
     connectionStatus_t ret=CONN_SUCCESS;
     m_pSocket=new Socket(m_ioService);
     if(m_pSocket!=NULL){
-        ret=Channel::init(pContext);
+        ret=Channel::init();
     }else{
         DRILL_LOG(LOG_ERROR) << "Channel initialization failure. " << std::endl;
         handleError(CONN_NOSOCKET, getMessage(ERR_CONN_NOSOCKET));
@@ -403,17 +349,17 @@ connectionStatus_t SocketChannel::init(ChannelContext_t* pContext){
 }
 
 #if defined(IS_SSL_ENABLED)
-connectionStatus_t SSLStreamChannel::init(ChannelContext_t* pContext){
+connectionStatus_t SSLStreamChannel::init(){
     connectionStatus_t ret=CONN_SUCCESS;
 
-    const DrillUserProperties* props = pContext->getUserProperties();
+    const DrillUserProperties* props = m_pContext->getUserProperties();
 	std::string useSystemTrustStore;
 	props->getProp(USERPROP_USESYSTEMTRUSTSTORE, useSystemTrustStore);
 	if (useSystemTrustStore != "true"){
 		std::string certFile;
 		props->getProp(USERPROP_CERTFILEPATH, certFile);
 		try{
-			((SSLChannelContext_t*)pContext)->getSslContext().load_verify_file(certFile);
+			((SSLChannelContext_t*)m_pContext)->getSslContext().load_verify_file(certFile);
 		}
 		catch (boost::system::system_error e){
 			DRILL_LOG(LOG_ERROR) << "Channel initialization failure. Certificate file  "
@@ -429,13 +375,13 @@ connectionStatus_t SSLStreamChannel::init(ChannelContext_t* pContext){
     props->getProp(USERPROP_DISABLE_HOSTVERIFICATION, disableHostVerification);
     if (disableHostVerification != "true") {
         std::string hostPortStr = m_pEndpoint->getHost() + ":" + m_pEndpoint->getPort();
-        ((SSLChannelContext_t *) pContext)->getSslContext().set_verify_callback(
+        ((SSLChannelContext_t *) m_pContext)->getSslContext().set_verify_callback(
                 boost::asio::ssl::rfc2818_verification(hostPortStr.c_str()));
     }
 
-    m_pSocket=new SslSocket(m_ioService, ((SSLChannelContext_t*)pContext)->getSslContext() );
+    m_pSocket=new SslSocket(m_ioService, ((SSLChannelContext_t*)m_pContext)->getSslContext() );
     if(m_pSocket!=NULL){
-        ret=Channel::init(pContext);
+        ret=Channel::init();
     }else{
         DRILL_LOG(LOG_ERROR) << "Channel initialization failure. " << std::endl;
         handleError(CONN_NOSOCKET, getMessage(ERR_CONN_NOSOCKET));

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/src/clientlib/channel.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/channel.hpp b/contrib/native/client/src/clientlib/channel.hpp
index 7f310e8..c7ebfee 100644
--- a/contrib/native/client/src/clientlib/channel.hpp
+++ b/contrib/native/client/src/clientlib/channel.hpp
@@ -36,9 +36,9 @@ class UserProperties;
             //parse the connection string and set up the host and port to connect to
             connectionStatus_t getDrillbitEndpoint();
 
-            std::string& getProtocol(){return m_protocol;}
-            std::string& getHost(){return m_host;}
-            std::string& getPort(){return m_port;}
+            const std::string& getProtocol() const {return m_protocol;}
+            const std::string& getHost() const {return m_host;}
+            const std::string& getPort() const {return m_port;}
             DrillClientError* getError(){ return m_pError;};
 
         private:
@@ -70,25 +70,21 @@ class UserProperties;
 
     class SSLChannelContext: public ChannelContext{
         public:
-            static boost::asio::ssl::context::method getTlsVersion(std::string version){
-                if(version.empty()){
-                    return boost::asio::ssl::context::tlsv12;
-                } else if (version == "tlsv12") {
+            static boost::asio::ssl::context::method getTlsVersion(const std::string & version){
+                if (version == "tlsv12") {
                     return boost::asio::ssl::context::tlsv12;
                 } else if (version == "tlsv11") {
                     return boost::asio::ssl::context::tlsv11;
-                } else if (version == "sslv23") {
-                    return boost::asio::ssl::context::sslv23;
                 } else if (version == "tlsv1") {
                     return boost::asio::ssl::context::tlsv1;
-                } else if (version == "sslv3") {
-                    return boost::asio::ssl::context::sslv3;
                 } else {
                     return boost::asio::ssl::context::tlsv12;
                 }
             }
 
-        SSLChannelContext(DrillUserProperties *props, boost::asio::ssl::context::method tlsVersion, boost::asio::ssl::verify_mode verifyMode) :
+        SSLChannelContext(DrillUserProperties *props,
+                          boost::asio::ssl::context::method tlsVersion,
+                          boost::asio::ssl::verify_mode verifyMode) :
                 ChannelContext(props),
                 m_SSLContext(tlsVersion) {
                 m_SSLContext.set_default_verify_paths();
@@ -108,11 +104,6 @@ class UserProperties;
     typedef ChannelContext ChannelContext_t; 
     typedef SSLChannelContext SSLChannelContext_t; 
 
-    class ChannelContextFactory{
-        public:
-            static ChannelContext_t* getChannelContext(channelType_t t, DrillUserProperties* props);
-    };
-
     /***
      * The Channel class encapsulates a connection to a drillbit. Based on 
      * the connection string and the options, the connection will be either 
@@ -122,13 +113,12 @@ class UserProperties;
      * will use to communicate with the server.
      ***/
     class Channel{
+        friend class ChannelFactory;
         public: 
-            Channel(const char* connStr);
-            Channel(const char* host, const char* port);
             Channel(boost::asio::io_service& ioService, const char* connStr);
             Channel(boost::asio::io_service& ioService, const char* host, const char* port);
             virtual ~Channel();
-            virtual connectionStatus_t init(ChannelContext_t* context)=0;
+            virtual connectionStatus_t init()=0;
             connectionStatus_t connect();
             bool isConnected(){ return m_state == CHANNEL_CONNECTED;}
             template <typename SettableSocketOption> void setOption(SettableSocketOption& option);
@@ -168,7 +158,6 @@ class UserProperties;
             ChannelContext_t *m_pContext;
 
         private:
-
             typedef enum channelState{ 
                 CHANNEL_UNINITIALIZED=1, 
                 CHANNEL_INITIALIZED, 
@@ -189,45 +178,42 @@ class UserProperties;
 
             channelState_t m_state;
             DrillClientError* m_pError;
-            bool m_ownIoService;
     };
 
     class SocketChannel: public Channel{
         public:
-            SocketChannel(const char* connStr):Channel(connStr){
-            }
-            SocketChannel(const char* host, const char* port):Channel(host, port){
-            }
             SocketChannel(boost::asio::io_service& ioService, const char* connStr)
                 :Channel(ioService, connStr){
             }
             SocketChannel(boost::asio::io_service& ioService, const char* host, const char* port)
                 :Channel(ioService, host, port){
             }
-            connectionStatus_t init(ChannelContext_t* context=NULL);
+            connectionStatus_t init();
     };
 
     class SSLStreamChannel: public Channel{
         public:
-            SSLStreamChannel(const char* connStr):Channel(connStr){
-            }
-            SSLStreamChannel(const char* host, const char* port):Channel(host, port){
-            }
             SSLStreamChannel(boost::asio::io_service& ioService, const char* connStr)
                 :Channel(ioService, connStr){
             }
             SSLStreamChannel(boost::asio::io_service& ioService, const char* host, const char* port)
                 :Channel(ioService, host, port){
             }
-            connectionStatus_t init(ChannelContext_t* context);
+            connectionStatus_t init();
     };
 
     class ChannelFactory{
         public:
-            static Channel* getChannel(channelType_t t, const char* connStr);
-            static Channel* getChannel(channelType_t t, const char* host, const char* port);
-            static Channel* getChannel(channelType_t t, boost::asio::io_service& ioService, const char* connStr);
-            static Channel* getChannel(channelType_t t, boost::asio::io_service& ioService, const char* host, const char* port);
+            static Channel* getChannel(channelType_t t,
+                                       boost::asio::io_service& ioService,
+                                       const char* connStr, DrillUserProperties* props);
+            static Channel* getChannel(channelType_t t,
+                                       boost::asio::io_service& ioService,
+                                       const char* host,
+                                       const char* port,
+                                       DrillUserProperties* props);
+        private:
+            static ChannelContext_t* getChannelContext(channelType_t t, DrillUserProperties* props);
     };
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 39ac847..f0bb636 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -19,31 +19,22 @@
 
 #include "drill/common.hpp"
 #include <queue>
-#include <string>
 #include <boost/algorithm/string.hpp>
 #include <boost/asio.hpp>
 #include <boost/assign.hpp>
 #include <boost/bind.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/date_time/posix_time/posix_time_duration.hpp>
 #include <boost/functional/factory.hpp>
-#include <boost/lexical_cast.hpp>
 #include <boost/thread.hpp>
 
 #include "drill/drillClient.hpp"
 #include "drill/fieldmeta.hpp"
 #include "drill/recordBatch.hpp"
+#include "drill/userProperties.hpp"
 #include "drillClientImpl.hpp"
-#include "collectionsImpl.hpp"
 #include "errmsgs.hpp"
 #include "logger.hpp"
-#include "metadata.hpp"
-#include "rpcMessage.hpp"
-#include "utils.hpp"
-#include "GeneralRPC.pb.h"
-#include "UserBitShared.pb.h"
 #include "zookeeperClient.hpp"
-#include "saslAuthenticatorImpl.hpp"
 
 namespace Drill{
 namespace { // anonymous namespace
@@ -65,108 +56,69 @@ struct ToRpcType: public std::unary_function<google::protobuf::int32, exec::user
 		return static_cast<exec::user::RpcType>(i);
 	}
 };
-}
-connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProperties* props){
-    std::string pathToDrill, protocol, hostPortStr;
-    std::string host;
-    std::string port;
+} // anonymous
 
+connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProperties* props){
     if (this->m_bIsConnected) {
-        if(std::strcmp(connStr, m_connectStr.c_str())){ // trying to connect to a different address is not allowed if already connected
+        if(!std::strcmp(connStr, m_connectStr.c_str())){
+            // trying to connect to a different address is not allowed if already connected
             return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
         }
         return CONN_SUCCESS;
     }
-
-    m_connectStr=connStr;
-    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
-    if(protocol == "zk"){
-        ZookeeperClient zook(pathToDrill);
-        std::vector<std::string> drillbits;
-        int err = zook.getAllDrillbits(hostPortStr, drillbits);
-        if(!err){
-            if (drillbits.empty()){
-                return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_ZKNODBIT));
-            }
-            Utils::shuffle(drillbits);
-            exec::DrillbitEndpoint endpoint;
-            err = zook.getEndPoint(drillbits[drillbits.size() -1], endpoint);// get the last one in the list
-            if(!err){
-                host=boost::lexical_cast<std::string>(endpoint.address());
-                port=boost::lexical_cast<std::string>(endpoint.user_port());
-            }
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << (drillbits.size() - 1)  << ">. Selected " << endpoint.DebugString() << std::endl;)
-
-        }
-        if(err){
-            return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
-        }
-        zook.close();
-        m_bIsDirectConnection=true;
-    }else if(protocol == "local"){
-        boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant
-        char tempStr[MAX_CONNECT_STR+1];
-        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
-        host=strtok(tempStr, ":");
-        port=strtok(NULL, "");
-        m_bIsDirectConnection=false;
-    }else{
-        return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
-    }
-    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;)
-    std::string serviceHost;
-    for (size_t i = 0; i < props->size(); i++) {
-        if (props->keyAt(i) == USERPROP_SERVICE_HOST) {
-            serviceHost = props->valueAt(i);
-        }
+    std::string val;
+    channelType_t type = ( props->isPropSet(USERPROP_USESSL) &&
+            props->getProp(USERPROP_USESSL, val) =="true") ?
+        CHANNEL_TYPE_SSLSTREAM :
+        CHANNEL_TYPE_SOCKET;
+
+    connectionStatus_t ret = CONN_SUCCESS;
+    m_pChannel= ChannelFactory::getChannel(type, m_io_service, connStr, props);
+    ret=m_pChannel->init();
+    if(ret!=CONN_SUCCESS){
+        handleConnError(m_pChannel->getError());
+        return ret;
     }
-    if (serviceHost.empty()) {
-        props->setProperty(USERPROP_SERVICE_HOST, host);
+    ret= m_pChannel->connect();
+    if(ret!=CONN_SUCCESS){
+        handleConnError(m_pChannel->getError());
+        return ret;
     }
-    connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
+    props->setProperty(USERPROP_SERVICE_HOST, m_pChannel->getEndpoint()->getHost());
+    m_bIsConnected = true;
     return ret;
 }
 
-connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
-    using boost::asio::ip::tcp;
-    tcp::endpoint endpoint;
-    try{
-        tcp::resolver resolver(m_io_service);
-        tcp::resolver::query query(tcp::v4(), host, port);
-        tcp::resolver::iterator iter = resolver.resolve(query);
-        tcp::resolver::iterator end;
-        while (iter != end){
-            endpoint = *iter++;
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;)
-        }
-        boost::system::error_code ec;
-        m_socket.connect(endpoint, ec);
-        if(ec){
-            return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_FAILURE, host, port, ec.message().c_str()));
-        }
-
-    }catch(const std::exception & e){
-        // Handle case when the hostname cannot be resolved. "resolve" is hard-coded in boost asio resolver.resolve
-        if (!strcmp(e.what(), "resolve")) {
-            return handleConnError(CONN_HOSTNAME_RESOLUTION_ERROR, getMessage(ERR_CONN_EXCEPT, e.what()));
+connectionStatus_t DrillClientImpl::connect(const char* host, const char* port, DrillUserProperties* props){
+    if (this->m_bIsConnected) {
+        std::string connStr = std::string(host)+":"+std::string(port);
+        if(!std::strcmp(connStr.c_str(), m_connectStr.c_str())){
+            // trying to connect to a different address is not allowed if already connected
+            return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
         }
-        return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what()));
+        return CONN_SUCCESS;
     }
-
-    m_bIsConnected=true;
-    // set socket keep alive
-    boost::asio::socket_base::keep_alive keepAlive(true);
-    m_socket.set_option(keepAlive);
-    // set no_delay
-    boost::asio::ip::tcp::no_delay noDelay(true);
-    m_socket.set_option(noDelay);
-
-    std::ostringstream connectedHost;
-    connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port;
-    m_connectedHost = connectedHost.str();
-    DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;)
-
-    return CONN_SUCCESS;
+    std::string val;
+    channelType_t type = ( props->isPropSet(USERPROP_USESSL) &&
+            props->getProp(USERPROP_USESSL, val) =="true") ?
+        CHANNEL_TYPE_SSLSTREAM :
+        CHANNEL_TYPE_SOCKET;
+
+    connectionStatus_t ret = CONN_SUCCESS;
+    m_pChannel= ChannelFactory::getChannel(type, m_io_service, host, port, props);
+    ret=m_pChannel->init();
+    if(ret!=CONN_SUCCESS){
+        handleConnError(m_pChannel->getError());
+        return ret;
+    }
+    ret=m_pChannel->connect();
+    if(ret!=CONN_SUCCESS){
+        handleConnError(m_pChannel->getError());
+        return ret;
+    }
+    props->setProperty(USERPROP_SERVICE_HOST, m_pChannel->getEndpoint()->getHost());
+    m_bIsConnected = true;
+    return ret;
 }
 
 void DrillClientImpl::startHeartbeatTimer(){
@@ -250,7 +202,15 @@ void DrillClientImpl::doWriteToSocket(const char* dataPtr, size_t bytesToWrite,
     // Write all the bytes to socket. In case of error when all bytes are not successfully written
     // proper errorCode will be set.
     while(1) {
-        size_t bytesWritten = m_socket.write_some(boost::asio::buffer(dataPtr, bytesToWrite), errorCode);
+        size_t bytesWritten;
+        {
+            boost::lock_guard<boost::mutex> lock(m_channelMutex);
+            if(m_pChannel==NULL){
+                return;
+            }
+            bytesWritten = m_pChannel->getSocketStream().writeSome(boost::asio::buffer(dataPtr, bytesToWrite),
+                                                                          errorCode);
+        }
 
         if(errorCode && boost::asio::error::interrupted != errorCode){
             break;
@@ -359,8 +319,10 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
     }
 
     m_io_service.reset();
-    if (DrillClientConfig::getHandshakeTimeout() > 0){
-        m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHandshakeTimeout()));
+      
+    int32_t handshakeTimeout=DrillClientConfig::getHandshakeTimeout();
+    if (handshakeTimeout > 0){
+        m_deadlineTimer.expires_from_now(boost::posix_time::seconds(handshakeTimeout));
         m_deadlineTimer.async_wait(boost::bind(
                     &DrillClientImpl::handleHShakeReadTimeout,
                     this,
@@ -370,16 +332,21 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
                 << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;)
     }
 
-    async_read(
-            this->m_socket,
-            boost::asio::buffer(m_rbuf, LEN_PREFIX_BUFLEN),
-            boost::bind(
-                &DrillClientImpl::handleHandshake,
-                this,
-                m_rbuf,
-                boost::asio::placeholders::error,
-                boost::asio::placeholders::bytes_transferred)
-            );
+    {
+        boost::lock_guard<boost::mutex> lock(m_channelMutex);
+        if (m_pChannel == NULL) {
+            return CONN_NOSOCKET;
+        }
+        m_pChannel->getSocketStream().asyncRead(
+                boost::asio::buffer(m_rbuf, LEN_PREFIX_BUFLEN),
+                boost::bind(
+                        &DrillClientImpl::handleHandshake,
+                        this,
+                        m_rbuf,
+                        boost::asio::placeholders::error,
+                        boost::asio::placeholders::bytes_transferred)
+        );
+    }
     DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";)
     m_io_service.run();
     if(m_rbuf!=NULL){
@@ -418,8 +385,15 @@ void DrillClientImpl::doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead,
     // Read all the bytes. In case when all the bytes were not read the proper
     // errorCode will be set.
     while(1){
-        size_t dataBytesRead = m_socket.read_some(boost::asio::buffer(inBuf, bytesToRead),
+        size_t dataBytesRead;
+        {
+            boost::lock_guard<boost::mutex> lock(m_channelMutex);
+            if(m_pChannel==NULL){
+                return;
+            }
+            dataBytesRead = m_pChannel->getSocketStream().readSome(boost::asio::buffer(inBuf, bytesToRead),
                                            errorCode);
+        }
         // Check if errorCode is EINTR then just retry otherwise break from loop
         if(errorCode && boost::asio::error::interrupted != errorCode){
             break;
@@ -518,8 +492,10 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
                                               << "Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";)
             handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT));
             m_io_service.stop();
-            boost::system::error_code ignorederr;
-            m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
+            {
+                boost::lock_guard<boost::mutex> lock(m_channelMutex);
+                if(m_pChannel != NULL) m_pChannel->close();
+            }
         }
     }
     return;
@@ -1027,16 +1003,21 @@ void DrillClientImpl::getNextResult(){
 
     startHeartbeatTimer();
 
-    async_read(
-            this->m_socket,
-            boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN),
-            boost::bind(
-                &DrillClientImpl::handleRead,
-                this,
-                readBuf,
-                boost::asio::placeholders::error,
-                boost::asio::placeholders::bytes_transferred)
-            );
+    {
+        boost::lock_guard<boost::mutex> lock(m_channelMutex);
+        if (m_pChannel == NULL) {
+            return;
+        }
+        m_pChannel->getSocketStream().asyncRead(
+                boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN),
+                boost::bind(
+                        &DrillClientImpl::handleRead,
+                        this,
+                        readBuf,
+                        boost::asio::placeholders::error,
+                        boost::asio::placeholders::bytes_transferred)
+        );
+    }
     DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";)
 }
 
@@ -1937,10 +1918,16 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
             // defined. To be really sure, we need to close the socket. Closing the socket is a bit
             // drastic and we will defer that till a later release.
 #ifdef WIN32_SHUTDOWN_ON_TIMEOUT
-            boost::system::error_code ignorederr;
-            m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
+            {
+                boost::lock_guard<boost::mutex> lock(m_channelMutex);
+                if(m_pChannel != NULL) m_pChannel->close();
+            }
+            m_pChannel->close();
 #else // NOT WIN32_SHUTDOWN_ON_TIMEOUT
-            m_socket.cancel();
+            {
+                boost::lock_guard<boost::mutex> lock(m_channelMutex);
+                if(m_pChannel != NULL) m_pChannel->getInnerSocket().cancel();
+            }
 #endif // WIN32_SHUTDOWN_ON_TIMEOUT
         }
     }
@@ -2149,6 +2136,20 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, c
     return status;
 }
 
+connectionStatus_t DrillClientImpl::handleConnError(DrillClientError* err){
+    DrillClientError* pErr = new DrillClientError(*err);
+    m_pendingRequests=0;
+    if(!m_queryHandles.empty()){
+        // set query error only if queries are running
+        broadcastError(pErr);
+    }else{
+        if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+        m_pError=pErr;
+        shutdownSocket();
+    }
+    return (connectionStatus_t)pErr->status;
+}
+
 /*
  * Always called with NULL QueryHandle when there is any error while reading data from socket. Once enough data is read
  * and a valid RPC message is formed then it can get called with NULL/valid QueryHandle depending on if QueryHandle is found
@@ -2268,9 +2269,16 @@ void DrillClientImpl::sendCancel(const exec::shared::QueryId* pQueryId){
 }
 
 void DrillClientImpl::shutdownSocket(){
+    m_pendingRequests=0;
+    m_heartbeatTimer.cancel();
+    m_deadlineTimer.cancel();
+    {
+        boost::lock_guard<boost::mutex> lock(m_channelMutex);
+        if (m_pChannel != NULL) {
+            m_pChannel->close();
+        }
+    }
     m_io_service.stop();
-    boost::system::error_code ignorederr;
-    m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
     m_bIsConnected=false;
 
     // Delete the saslAuthenticatorImpl instance since connection is broken. It will recreated on next
@@ -2697,7 +2705,7 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr, DrillUser
     }
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;)
         DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
-    stat =  pDrillClientImpl->connect(host.c_str(), port.c_str());
+    stat =  pDrillClientImpl->connect(host.c_str(), port.c_str(), props);
     if(stat == CONN_SUCCESS){
         boost::lock_guard<boost::mutex> lock(m_poolMutex);
         m_clientConnections.push_back(pDrillClientImpl);

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index dacc2c3..dc4a67e 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -45,6 +45,7 @@
 #include "drill/drillConfig.hpp"
 #include "drill/drillError.hpp"
 #include "drill/preparedStatement.hpp"
+#include "channel.hpp"
 #include "collectionsImpl.hpp"
 #include "metadata.hpp"
 #include "rpcMessage.hpp"
@@ -386,7 +387,8 @@ class DrillClientImpl : public DrillClientImplBase{
             m_pError(NULL),
             m_pListenerThread(NULL),
             m_pWork(NULL),
-            m_socket(m_io_service),
+            m_pChannel(NULL),
+            m_pChannelContext(NULL),
             m_deadlineTimer(m_io_service),
             m_heartbeatTimer(m_io_service),
             m_rbuf(NULL),
@@ -399,9 +401,11 @@ class DrillClientImpl : public DrillClientImplBase{
     };
 
         ~DrillClientImpl(){
-            //TODO: Cleanup.
-            //Free any record batches or buffers remaining
             //Cancel any pending requests
+            m_heartbeatTimer.cancel();
+            m_deadlineTimer.cancel();
+            m_io_service.stop();
+            //Free any record batches or buffers remaining
             //Clear and destroy DrillClientQueryResults vector?
             if(this->m_pWork!=NULL){
                 delete this->m_pWork;
@@ -411,13 +415,19 @@ class DrillClientImpl : public DrillClientImplBase{
                 delete this->m_saslAuthenticator;
                 this->m_saslAuthenticator = NULL;
             }
+            {
+                boost::lock_guard<boost::mutex> lock(m_channelMutex);
+                if (this->m_pChannel != NULL) {
+                    m_pChannel->close();
+                    delete this->m_pChannel;
+                    this->m_pChannel = NULL;
+                }
+                if (this->m_pChannelContext != NULL) {
+                    delete this->m_pChannelContext;
+                    this->m_pChannelContext = NULL;
+                }
+            }
 
-            m_heartbeatTimer.cancel();
-            m_deadlineTimer.cancel();
-            m_io_service.stop();
-            boost::system::error_code ignorederr;
-            m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
-            m_socket.close();
             if(m_rbuf!=NULL){
                 Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
             }
@@ -442,6 +452,8 @@ class DrillClientImpl : public DrillClientImplBase{
 
         //Connect via Zookeeper or directly
         connectionStatus_t connect(const char* connStr, DrillUserProperties* props);
+        connectionStatus_t connect(const char* host, const char* port, DrillUserProperties* props);
+
         // test whether the client is active
         bool Active();
         void Close() ;
@@ -523,6 +535,7 @@ class DrillClientImpl : public DrillClientImplBase{
         status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError);
         bool validateResultRPCType(DrillClientQueryHandle* pQueryHandle, const rpc::InBoundRpcMessage& msg);
         connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg);
+        connectionStatus_t handleConnError(DrillClientError* err);
         status_t handleQryCancellation(status_t status, DrillClientQueryResult* pQueryResult);
         status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle);
         status_t handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryHandle* pQueryHandle);
@@ -603,7 +616,12 @@ class DrillClientImpl : public DrillClientImplBase{
         boost::asio::io_service m_io_service;
         // the work object prevent io_service running out of work
         boost::asio::io_service::work * m_pWork;
-        boost::asio::ip::tcp::socket m_socket;
+
+        // Mutex to protect channel
+        boost::mutex m_channelMutex;
+        Channel* m_pChannel;
+        ChannelContext_t* m_pChannelContext;
+
         boost::asio::deadline_timer m_deadlineTimer; // to timeout async queries that never return
         boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat messages
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/src/clientlib/drillConfig.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillConfig.cpp b/contrib/native/client/src/clientlib/drillConfig.cpp
index abaa79a..90a751a 100644
--- a/contrib/native/client/src/clientlib/drillConfig.cpp
+++ b/contrib/native/client/src/clientlib/drillConfig.cpp
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-
+#include <boost/thread/lock_guard.hpp>
 #include "drill/common.hpp"
 #include "drill/drillConfig.hpp"
 #include "env.h"

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/src/include/drill/common.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index 012bd19..d8e2da7 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -165,6 +165,10 @@ typedef enum{
     RET_FAILURE=1
 } ret_t;
 
+// Connect string protocol types
+#define PROTOCOL_TYPE_ZK     "zk"
+#define PROTOCOL_TYPE_DIRECT "drillbit"
+#define PROTOCOL_TYPE_DIRECT_2 "local"
 
 // User Property Names
 #define USERPROP_USERNAME "userName"
@@ -173,7 +177,8 @@ typedef enum{
 #define USERPROP_USESSL   "enableTLS"
 #define USERPROP_TLSPROTOCOL "TLSProtocol" //TLS version
 #define USERPROP_CERTFILEPATH "certFilePath" // pem file path and name
-#define USERPROP_CERTPASSWORD "certPassword" // Password for certificate file
+// TODO: support truststore protected by password. 
+// #define USERPROP_CERTPASSWORD "certPassword" // Password for certificate file. 
 #define USERPROP_DISABLE_HOSTVERIFICATION "disableHostVerification"
 #define USERPROP_DISABLE_CERTVERIFICATION "disableCertVerification"
 #define USERPROP_USESYSTEMTRUSTSTORE "useSystemTrustStore" //Windows only, use the system trust store

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/src/include/drill/drillConfig.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/drillConfig.hpp b/contrib/native/client/src/include/drill/drillConfig.hpp
index 669267d..46bbbb2 100644
--- a/contrib/native/client/src/include/drill/drillConfig.hpp
+++ b/contrib/native/client/src/include/drill/drillConfig.hpp
@@ -21,27 +21,7 @@
 #define DRILL_CONFIG_H
 
 #include "drill/common.hpp"
-#include <boost/thread.hpp>
-
-
-
-#if defined _WIN32 || defined __CYGWIN__
-  #ifdef DRILL_CLIENT_EXPORTS
-      #define DECLSPEC_DRILL_CLIENT __declspec(dllexport)
-  #else
-    #ifdef USE_STATIC_LIBDRILL
-      #define DECLSPEC_DRILL_CLIENT
-    #else
-      #define DECLSPEC_DRILL_CLIENT  __declspec(dllimport)
-    #endif
-  #endif
-#else
-  #if __GNUC__ >= 4
-    #define DECLSPEC_DRILL_CLIENT __attribute__ ((visibility ("default")))
-  #else
-    #define DECLSPEC_DRILL_CLIENT
-  #endif
-#endif
+#include <boost/thread/mutex.hpp>
 
 namespace exec{
     namespace shared{

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/src/include/drill/userProperties.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/userProperties.hpp b/contrib/native/client/src/include/drill/userProperties.hpp
index 3490dce..62a04f7 100644
--- a/contrib/native/client/src/include/drill/userProperties.hpp
+++ b/contrib/native/client/src/include/drill/userProperties.hpp
@@ -36,20 +36,17 @@ class DECLSPEC_DRILL_CLIENT DrillUserProperties{
 
         size_t size() const { return m_properties.size(); }
 
-        //const std::string& keyAt(size_t i) const { return m_properties.at(i).first; }
-
-        //const std::string& valueAt(size_t i) const { return m_properties.at(i).second; }
-
         const bool  isPropSet(const std::string& key) const{
             bool isSet=true;
-            auto f= m_properties.find(key);
+            std::map<std::string, std::string>::const_iterator f=m_properties.find(key);
             if(f==m_properties.end()){
                 isSet=false;
             }
             return isSet;
         }
+
         const std::string&  getProp(const std::string& key, std::string& value) const{
-            auto f= m_properties.find(key);
+            std::map<std::string, std::string>::const_iterator f=m_properties.find(key);
             if(f!=m_properties.end()){
                 value=f->second;
             }

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/contrib/native/client/test/ssl/testSSL.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/test/ssl/testSSL.cpp b/contrib/native/client/test/ssl/testSSL.cpp
index 3eaac48..4262566 100644
--- a/contrib/native/client/test/ssl/testSSL.cpp
+++ b/contrib/native/client/test/ssl/testSSL.cpp
@@ -330,6 +330,7 @@ int main(int argc, char* argv[]){
     std::string connectStr = "zk=localhost:2181/drill/drillbits1";
     //std::string connectStr = "drillbit=localhost:31090";
     channelType_t type;
+    boost::asio::io_service ioService;
 
     bool isSSL = argc==2 && !(strcmp(argv[1], "ssl"));
     type = CHANNEL_TYPE_SOCKET;
@@ -341,12 +342,10 @@ int main(int argc, char* argv[]){
     props.setProperty(USERPROP_PASSWORD, "admin");
     props.setProperty(USERPROP_CERTFILEPATH, "../../../test/ssl/drillTestCert.pem");
 
-    pChannelContext = ChannelContextFactory::getChannelContext(type, &props);
-
-    pChannel = ChannelFactory::getChannel(type, connectStr.c_str());
+    pChannel = ChannelFactory::getChannel(type, ioService, connectStr.c_str(), &props);
     if(pChannel != NULL){
         connectionStatus_t connStat;
-        connStat = pChannel->init(pChannelContext);
+        connStat = pChannel->init();
         if(connStat != CONN_SUCCESS){
             std::cout << "Init Failed." << std::endl;
             return -1;

http://git-wip-us.apache.org/repos/asf/drill/blob/f246c3ca/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9c8a304..a73a39b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -226,7 +226,8 @@
             <exclude>**/*.pb.cc</exclude>
             <exclude>**/*.pb.h</exclude>
             <exclude>**/*.linux</exclude>
-            <exclude>**/client/build/**</exclude>
+            <exclude>**/client/*build*/**</exclude>
+            <exclude>**/client/tags</exclude>
             <exclude>**/cmake_install.cmake</exclude>
             <exclude>**/ssl/*.csr</exclude>
             <exclude>**/*.tbl</exclude>


Mime
View raw message