drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [11/15] drill git commit: DRILL-4420: C++ API for metadata access and prepared statements
Date Tue, 01 Nov 2016 20:29:56 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/metadata.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/metadata.hpp b/contrib/native/client/src/clientlib/metadata.hpp
new file mode 100644
index 0000000..0cc8987
--- /dev/null
+++ b/contrib/native/client/src/clientlib/metadata.hpp
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef DRILL_METADATA_H
+#define DRILL_METADATA_H
+
+#include <boost/ref.hpp>
+
+#include "drill/common.hpp"
+#include "drill/drillClient.hpp"
+#include "env.h"
+#include "User.pb.h"
+
+namespace Drill {
+class DrillClientImpl;
+
+namespace meta {
+	class DrillCatalogMetadata: public meta::CatalogMetadata {
+	public:
+		DrillCatalogMetadata(const ::exec::user::CatalogMetadata& metadata):
+			meta::CatalogMetadata(),
+			m_pMetadata(metadata){
+		}
+
+	  bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); }
+	  const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); }
+
+	  bool hasDescription() const { return m_pMetadata.get().has_description(); }
+	  const std::string& getDescription() const { return m_pMetadata.get().description(); }
+
+	  bool hasConnect() const { return m_pMetadata.get().has_connect(); }
+	  const std::string& getConnect() const { return m_pMetadata.get().connect(); }
+
+	private:
+		boost::reference_wrapper<const ::exec::user::CatalogMetadata> m_pMetadata;
+	};
+
+	class DrillSchemaMetadata: public meta::SchemaMetadata {
+	public:
+		DrillSchemaMetadata(const ::exec::user::SchemaMetadata& metadata):
+			meta::SchemaMetadata(),
+			m_pMetadata(metadata){
+		}
+
+		bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); }
+		const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); }
+
+		bool hasSchemaName() const { return m_pMetadata.get().has_schema_name(); }
+		const std::string& getSchemaName() const { return m_pMetadata.get().schema_name(); }
+
+		bool hasOwnerName() const { return m_pMetadata.get().has_owner(); }
+		const std::string& getOwner() const { return m_pMetadata.get().owner(); }
+
+		bool hasType() const { return m_pMetadata.get().has_type(); }
+		const std::string& getType() const { return m_pMetadata.get().type(); }
+
+		bool hasMutable() const { return m_pMetadata.get().has_mutable_(); }
+		const std::string& getMutable() const { return m_pMetadata.get().mutable_(); }
+
+	private:
+		boost::reference_wrapper<const ::exec::user::SchemaMetadata> m_pMetadata;
+	};
+
+	class DrillTableMetadata: public meta::TableMetadata {
+	public:
+		DrillTableMetadata(const ::exec::user::TableMetadata& metadata):
+			meta::TableMetadata(),
+			m_pMetadata(metadata){
+		}
+
+	  bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); }
+	  const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); }
+
+	  bool hasSchemaName() const { return m_pMetadata.get().has_schema_name(); }
+	  const std::string& getSchemaName() const { return m_pMetadata.get().schema_name(); }
+
+	  bool hasTableName() const { return m_pMetadata.get().has_table_name(); }
+	  const std::string& getTableName() const { return m_pMetadata.get().table_name(); }
+
+	  bool hasType() const { return m_pMetadata.get().has_type(); }
+	  const std::string& getType() const { return m_pMetadata.get().type(); }
+
+	private:
+	  boost::reference_wrapper<const ::exec::user::TableMetadata> m_pMetadata;
+	};
+
+	class DrillColumnMetadata: public meta::ColumnMetadata {
+	public:
+		DrillColumnMetadata(const ::exec::user::ColumnMetadata& metadata):
+			meta::ColumnMetadata(),
+			m_pMetadata(metadata){
+		}
+
+		bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); }
+		const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); }
+
+		bool hasSchemaName() const { return m_pMetadata.get().has_schema_name(); }
+		const std::string& getSchemaName() const { return m_pMetadata.get().schema_name(); }
+
+		bool hasTableName() const { return m_pMetadata.get().has_table_name(); }
+		const std::string& getTableName() const { return m_pMetadata.get().table_name(); }
+
+		bool hasColumnName() const { return m_pMetadata.get().has_column_name(); }
+		const std::string& getColumnName() const { return m_pMetadata.get().column_name(); }
+
+		bool hasOrdinalPosition() const { return m_pMetadata.get().has_ordinal_position(); }
+		std::size_t getOrdinalPosition() const { return m_pMetadata.get().ordinal_position(); }
+
+		bool hasDefaultValue() const { return m_pMetadata.get().has_default_value(); }
+		const std::string& getDefaultValue() const { return m_pMetadata.get().default_value(); }
+
+		bool hasNullable() const { return m_pMetadata.get().has_is_nullable(); }
+		bool isNullable() const { return m_pMetadata.get().is_nullable(); }
+
+		bool hasDataType() const { return m_pMetadata.get().has_data_type(); }
+		const std::string& getDataType() const { return m_pMetadata.get().data_type(); }
+
+		bool hasColumnSize() const { return m_pMetadata.get().has_column_size(); }
+		std::size_t getColumnSize() const { return m_pMetadata.get().column_size(); }
+
+		bool hasCharMaxLength() const { return m_pMetadata.get().has_char_max_length(); }
+		std::size_t getCharMaxLength() const { return m_pMetadata.get().char_max_length(); }
+
+		bool hasCharOctetLength() const { return m_pMetadata.get().has_char_octet_length(); }
+		std::size_t getCharOctetLength() const { return m_pMetadata.get().char_octet_length(); }
+
+		bool hasNumericPrecision() const { return m_pMetadata.get().has_numeric_precision(); }
+		int32_t getNumericPrecision() const { return m_pMetadata.get().numeric_precision(); }
+
+		bool hasNumericRadix() const { return m_pMetadata.get().has_numeric_precision_radix(); }
+		int32_t getNumericRadix() const { return m_pMetadata.get().numeric_precision_radix(); }
+
+		bool hasNumericScale() const { return m_pMetadata.get().has_numeric_scale(); }
+		int32_t getNumericScale() const { return m_pMetadata.get().numeric_scale(); }
+
+		bool hasIntervalType() const { return m_pMetadata.get().has_interval_type(); }
+		const std::string& getIntervalType() const { return m_pMetadata.get().interval_type(); }
+
+		bool hasIntervalPrecision() const { return m_pMetadata.get().has_interval_precision(); }
+		int32_t getIntervalPrecision() const { return m_pMetadata.get().interval_precision(); }
+
+	private:
+		boost::reference_wrapper<const ::exec::user::ColumnMetadata> m_pMetadata;
+	};
+
+    class DrillMetadata: public Metadata {
+    public:
+        static const std::string s_connectorName; 
+        static const std::string s_connectorVersion; 
+
+        static const std::string s_serverName;
+        static const std::string s_serverVersion;
+
+        static const std::string s_catalogSeparator;
+        static const std::string s_catalogTerm;
+
+        static const std::string s_identifierQuoteString;
+        static const std::vector<std::string> s_sqlKeywords;
+        static const std::vector<std::string> s_numericFunctions;
+        static const std::string s_schemaTerm;
+        static const std::string s_searchEscapeString;
+        static const std::string s_specialCharacters;
+        static const std::vector<std::string> s_stringFunctions;
+        static const std::vector<std::string> s_systemFunctions;
+        static const std::string s_tableTerm;
+        static const std::vector<std::string> s_dateTimeFunctions;
+
+        DrillMetadata(DrillClientImpl& client): Metadata(), m_client(client) {}
+        ~DrillMetadata() {}
+
+        DrillClientImpl& client() { return m_client; }
+
+        const std::string& getConnectorName() const { return s_connectorName; };
+        const std::string& getConnectorVersion() const { return s_connectorVersion; }
+        uint32_t getConnectorMajorVersion() const { return DRILL_VERSION_MAJOR; } 
+        uint32_t getConnectorMinorVersion() const { return DRILL_VERSION_MINOR; } 
+        uint32_t getConnectorPatchVersion() const { return DRILL_VERSION_PATCH; } 
+
+        const std::string& getServerName() const;
+        const std::string& getServerVersion() const;
+        uint32_t getServerMajorVersion() const;
+        uint32_t getServerMinorVersion() const;
+        uint32_t getServerPatchVersion() const;
+
+        status_t getCatalogs(const std::string& catalogPattern, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle);
+        status_t getSchemas(const std::string& catalogPattern, const std::string& schemaPattern, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle);
+        status_t getTables(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::vector<std::string>* tableTypes, Metadata::pfnTableMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle);
+        status_t getColumns(const std::string& catalogPattern, const std::string& schemaPattern, const std:: string& tablePattern, const std::string& columnPattern, Metadata::pfnColumnMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle);
+
+        bool areAllTableSelectable() const { return false; }
+        bool isCatalogAtStart() const { return true; }
+        const std::string& getCatalogSeparator() const { return s_catalogSeparator; }
+        const std::string& getCatalogTerm() const { return s_catalogTerm; }
+        bool isColumnAliasingSupported() const { return true; }
+        bool isNullPlusNonNullNull() const { return true; }
+        bool isConvertSupported(common::MinorType from, common::MinorType to) const;
+        meta::CorrelationNamesSupport getCorrelationNames() const { return meta::CN_ANY_NAMES; }
+        bool isReadOnly() const { return false; }
+        meta::DateTimeLiteralSupport getDateTimeLiteralsSupport() const {
+            return DL_DATE
+                | DL_TIME
+                | DL_TIMESTAMP
+                | DL_INTERVAL_YEAR
+                | DL_INTERVAL_MONTH
+                | DL_INTERVAL_DAY
+                | DL_INTERVAL_HOUR
+                | DL_INTERVAL_MINUTE
+                | DL_INTERVAL_SECOND
+                | DL_INTERVAL_YEAR_TO_MONTH
+                | DL_INTERVAL_DAY_TO_HOUR
+                | DL_INTERVAL_DAY_TO_MINUTE
+                | DL_INTERVAL_DAY_TO_SECOND
+                | DL_INTERVAL_HOUR_TO_MINUTE
+                | DL_INTERVAL_HOUR_TO_SECOND
+                | DL_INTERVAL_MINUTE_TO_SECOND;
+        }
+
+        meta::CollateSupport getCollateSupport() const { return meta::C_NONE; }// supported?
+        meta::GroupBySupport getGroupBySupport() const { return meta::GB_UNRELATED; }
+        meta::IdentifierCase getIdentifierCase() const { return meta::IC_STORES_UPPER; } // to check?
+
+        const std::string& getIdentifierQuoteString() const { return s_identifierQuoteString; }
+        const std::vector<std::string>& getSQLKeywords() const { return s_sqlKeywords; }
+        bool isLikeEscapeClauseSupported() const { return true; }
+        std::size_t getMaxBinaryLiteralLength() const { return 0; }
+        std::size_t getMaxCatalogNameLength() const { return 0; }
+        std::size_t getMaxCharLiteralLength() const { return 0; }
+        std::size_t getMaxColumnNameLength() const { return 0; }
+        std::size_t getMaxColumnsInGroupBy() const { return 0; }
+        std::size_t getMaxColumnsInOrderBy() const { return 0; }
+        std::size_t getMaxColumnsInSelect() const { return 0; }
+        std::size_t getMaxCursorNameLength() const { return 0; }
+        std::size_t getMaxLogicalLobSize() const { return 0; }
+        std::size_t getMaxStatements() const { return 0; }
+        std::size_t getMaxRowSize() const { return 0; }
+        bool isBlobIncludedInMaxRowSize() const { return true; }
+        std::size_t getMaxSchemaNameLength() const { return 0; }
+        std::size_t getMaxStatementLength() const { return 0; }
+        std::size_t getMaxTableNameLength() const { return 0; }
+        std::size_t getMaxTablesInSelect() const { return 0; }
+        std::size_t getMaxUserNameLength() const { return 0; }
+        meta::NullCollation getNullCollation() const { return meta::NC_AT_END; }
+        const std::vector<std::string>& getNumericFunctions() const { return s_numericFunctions; }
+        meta::OuterJoinSupport getOuterJoinSupport() const { return meta::OJ_LEFT 
+            | meta::OJ_RIGHT 
+            | meta::OJ_FULL;
+        }
+        bool isUnrelatedColumnsInOrderBySupported() const { return true; }
+        meta::QuotedIdentifierCase getQuotedIdentifierCase() const { return meta::QIC_SUPPORTS_MIXED; }
+        const std::string& getSchemaTerm() const { return s_schemaTerm; }
+        const std::string& getSearchEscapeString() const { return s_searchEscapeString; }
+        const std::string& getSpecialCharacters() const { return s_specialCharacters; }
+        const std::vector<std::string>& getStringFunctions() const { return s_stringFunctions; }
+        meta::SubQuerySupport getSubQuerySupport() const { return SQ_CORRELATED
+                | SQ_IN_COMPARISON
+                | SQ_IN_EXISTS
+                | SQ_IN_QUANTIFIED;
+        }
+        const std::vector<std::string>& getSystemFunctions() const { return s_systemFunctions; }
+        const std::string& getTableTerm() const { return s_tableTerm; }
+        const std::vector<std::string>& getDateTimeFunctions() const { return s_dateTimeFunctions; }
+        bool isTransactionSupported() const { return false; }
+        meta::UnionSupport getUnionSupport() const { return meta::U_UNION | meta::U_UNION_ALL; }
+        bool isSelectForUpdateSupported() const { return false; }
+
+    private:
+        DrillClientImpl& m_client;
+    };
+} // namespace meta
+} // namespace Drill
+
+#endif // DRILL_METADATA

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/recordBatch.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/recordBatch.cpp b/contrib/native/client/src/clientlib/recordBatch.cpp
index c6c033b..6e13293 100644
--- a/contrib/native/client/src/clientlib/recordBatch.cpp
+++ b/contrib/native/client/src/clientlib/recordBatch.cpp
@@ -17,6 +17,7 @@
  */
 
 #include "drill/common.hpp"
+#include "drill/fieldmeta.hpp"
 #include "drill/recordBatch.hpp"
 #include "utils.hpp"
 #include "../protobuf/User.pb.h"
@@ -403,17 +404,6 @@ bool RecordBatch::isLastChunk(){
 
 
 
-void FieldMetadata::set(const exec::shared::SerializedField& f){
-    m_name=f.name_part().name();
-    m_minorType=f.major_type().minor_type();
-    m_dataMode=f.major_type().mode();
-    m_valueCount=f.value_count();
-    m_scale=f.major_type().scale();
-    m_precision=f.major_type().precision();
-    m_bufferLength=f.buffer_length();
-}
-
-
 void DateHolder::load(){
     m_year=1970;
     m_month=1;

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcDecoder.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcDecoder.cpp b/contrib/native/client/src/clientlib/rpcDecoder.cpp
deleted file mode 100644
index d3cf50c..0000000
--- a/contrib/native/client/src/clientlib/rpcDecoder.cpp
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#include <iostream>
-#include <google/protobuf/io/coded_stream.h>
-#include "drill/common.hpp"
-#include "rpcEncoder.hpp"
-#include "rpcDecoder.hpp"
-#include "rpcMessage.hpp"
-
-namespace Drill{
-
-// return the number of bytes we have read
-int RpcDecoder::LengthDecode(const uint8_t* buf, uint32_t* p_length) {
-
-    using google::protobuf::io::CodedInputStream;
-
-    // read the frame to get the length of the message and then
-
-    CodedInputStream* cis = new CodedInputStream(buf, 5); // read 5 bytes at most
-
-    int pos0 = cis->CurrentPosition(); // for debugging
-    cis->ReadVarint32(p_length);
-
-    #ifdef CODER_DEBUG
-    cerr << "p_length = " << *p_length << endl;
-    #endif
-
-    int pos1 = cis->CurrentPosition();
-
-    #ifdef CODER_DEBUG
-    cerr << "Reading full length " << *p_length << endl;
-    #endif
-    assert( (pos1-pos0) == getRawVarintSize(*p_length));
-    delete cis;
-    return (pos1-pos0);
-}
-
-// TODO: error handling
-//
-// - assume that the entire message is in the buffer and the buffer is constrained to this message
-// - easy to handle with raw arry in C++
-int RpcDecoder::Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) {
-    using google::protobuf::io::CodedInputStream;
-
-    // if(!ctx.channel().isOpen()){ return; }
-
-    #ifdef  EXTRA_DEBUGGING
-    std::cerr <<  "\nInbound rpc message received." << std::endl;
-    #endif
-
-    CodedInputStream* cis = new CodedInputStream(buf, length);
-
-
-    int pos0 = cis->CurrentPosition(); // for debugging
-
-    int len_limit = cis->PushLimit(length);
-
-    uint32_t header_length = 0;
-    cis->ExpectTag(RpcEncoder::HEADER_TAG);
-    cis->ReadVarint32(&header_length);
-
-    #ifdef CODER_DEBUG
-    cerr << "Reading header length " << header_length << ", post read index " << cis->CurrentPosition() << endl;
-    #endif
-
-    exec::rpc::RpcHeader header;
-    int header_limit = cis->PushLimit(header_length);
-    header.ParseFromCodedStream(cis);
-    cis->PopLimit(header_limit);
-    msg.m_has_mode = header.has_mode();
-    msg.m_mode = header.mode();
-    msg.m_coord_id = header.coordination_id();
-    msg.m_has_rpc_type = header.has_rpc_type();
-    msg.m_rpc_type = header.rpc_type();
-
-    //if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex());
-
-    // read the protobuf body into a buffer.
-    cis->ExpectTag(RpcEncoder::PROTOBUF_BODY_TAG);
-    uint32_t p_body_length = 0;
-    cis->ReadVarint32(&p_body_length);
-
-    #ifdef CODER_DEBUG
-    cerr << "Reading protobuf body length " << p_body_length << ", post read index " << cis->CurrentPosition() << endl;
-    #endif
-
-    msg.m_pbody.resize(p_body_length);
-    cis->ReadRaw(msg.m_pbody.data(),p_body_length);
-
-
-    // read the data body.
-    if (cis->BytesUntilLimit() > 0 ) {
-    #ifdef CODER_DEBUG
-        cerr << "Reading raw body, buffer has "<< cis->BytesUntilLimit() << " bytes available, current possion "<< cis->CurrentPosition()  << endl;
-    #endif
-        cis->ExpectTag(RpcEncoder::RAW_BODY_TAG);
-        uint32_t d_body_length = 0;
-        cis->ReadVarint32(&d_body_length);
-
-        if(cis->BytesUntilLimit() != d_body_length) {
-    #ifdef CODER_DEBUG
-            cerr << "Expected to receive a raw body of " << d_body_length << " bytes but received a buffer with " <<cis->BytesUntilLimit() << " bytes." << endl;
-    #endif
-        }
-        //msg.m_dbody.resize(d_body_length);
-        //cis->ReadRaw(msg.m_dbody.data(), d_body_length);
-        uint32_t currPos=cis->CurrentPosition();
-        cis->GetDirectBufferPointer((const void**)&msg.m_dbody, (int*)&d_body_length);
-        assert(msg.m_dbody==buf+currPos);
-        cis->Skip(d_body_length);
-    #ifdef CODER_DEBUG
-        cerr << "Read raw body of " << d_body_length << " bytes" << endl;
-    #endif
-    } else {
-    #ifdef CODER_DEBUG
-        cerr << "No need to read raw body, no readable bytes left." << endl;
-    #endif
-    }
-    cis->PopLimit(len_limit);
-
-
-    // return the rpc message.
-    // move the reader index forward so the next rpc call won't try to work with it.
-    // buffer.skipBytes(dBodyLength);
-    // messageCounter.incrementAndGet();
-    #ifdef CODER_DEBUG
-    cerr << "Inbound Rpc Message Decoded " << msg << endl;
-    #endif
-
-    int pos1 = cis->CurrentPosition();
-    assert((pos1-pos0) == length);
-    delete cis;
-    return (pos1-pos0);
-}
-
-}//namespace Drill

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcDecoder.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcDecoder.hpp b/contrib/native/client/src/clientlib/rpcDecoder.hpp
deleted file mode 100644
index dca49f7..0000000
--- a/contrib/native/client/src/clientlib/rpcDecoder.hpp
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#ifndef RPC_DECODER_H
-#define RPC_DECODER_H
-
-#include "rpcMessage.hpp"
-
-namespace Drill {
-
-class RpcDecoder {
-    public:
-        RpcDecoder() { }
-        ~RpcDecoder() { }
-        // bool Decode(const DataBuf& buf);
-        // bool Decode(const DataBuf& buf, InBoundRpcMessage& msg);
-        static int LengthDecode(const uint8_t* buf, uint32_t* length); // read the length prefix (at most 4 bytes)
-        static int Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg);
-};
-
-} // namespace Drill
-#endif

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcEncoder.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcEncoder.cpp b/contrib/native/client/src/clientlib/rpcEncoder.cpp
deleted file mode 100644
index 2f354d7..0000000
--- a/contrib/native/client/src/clientlib/rpcEncoder.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/message_lite.h>
-#include <google/protobuf/wire_format_lite.h>
-
-#include "drill/common.hpp"
-#include "rpcEncoder.hpp"
-#include "rpcMessage.hpp"
-
-namespace Drill{
-
-using google::protobuf::internal::WireFormatLite;
-using exec::rpc::CompleteRpcMessage;
-
-const uint32_t RpcEncoder::HEADER_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kHeaderFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
-const uint32_t RpcEncoder::PROTOBUF_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kProtobufBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
-const uint32_t RpcEncoder::RAW_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kRawBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
-const uint32_t RpcEncoder::HEADER_TAG_LENGTH = getRawVarintSize(HEADER_TAG);
-const uint32_t RpcEncoder::PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG);
-const uint32_t RpcEncoder::RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG);
-
-
-bool RpcEncoder::Encode(DataBuf& buf, OutBoundRpcMessage& msg) {
-    using exec::rpc::RpcHeader;
-    using google::protobuf::io::CodedOutputStream;
-    using google::protobuf::io::ArrayOutputStream;
-    // Todo:
-    //
-    // - let a context manager to allocate a buffer `ByteBuf buf = ctx.alloc().buffer();`
-    // - builder pattern
-    //
-    #ifdef CODER_DEBUG
-    cerr << "\nEncoding outbound message " << msg << endl;
-    #endif
-
-    RpcHeader header;
-    header.set_mode(msg.m_mode);
-    header.set_coordination_id(msg.m_coord_id);
-    header.set_rpc_type(msg.m_rpc_type);
-
-    // calcute the length of the message
-    int header_length = header.ByteSize();
-    int proto_body_length = msg.m_pbody->ByteSize();
-    int full_length = HEADER_TAG_LENGTH + getRawVarintSize(header_length) + header_length + \
-                      PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(proto_body_length) + proto_body_length;
-
-    /*
-       if(raw_body_length > 0) {
-       full_length += (RAW_BODY_TAG_LENGTH + getRawVarintSize(raw_body_length) + raw_body_length);
-       }
-       */
-
-    buf.resize(full_length + getRawVarintSize(full_length));
-    ArrayOutputStream* os = new ArrayOutputStream(buf.data(), buf.size());
-    CodedOutputStream* cos = new CodedOutputStream(os);
-
-
-    #ifdef CODER_DEBUG
-    cerr << "Writing full length " << full_length << endl;
-    #endif
-
-    // write full length first (this is length delimited stream).
-    cos->WriteVarint32(full_length);
-
-    #ifdef CODER_DEBUG
-    cerr << "Writing header length " << header_length << endl;
-    #endif
-
-    cos->WriteVarint32(HEADER_TAG);
-    cos->WriteVarint32(header_length);
-
-    header.SerializeToCodedStream(cos);
-
-    // write protobuf body length and body
-    #ifdef CODER_DEBUG
-    cerr << "Writing protobuf body length " << proto_body_length << endl;
-    #endif
-
-    cos->WriteVarint32(PROTOBUF_BODY_TAG);
-    cos->WriteVarint32(proto_body_length);
-    msg.m_pbody->SerializeToCodedStream(cos);
-
-    delete os;
-    delete cos;
-
-    // Done! no read to write data body for client
-    return true;
-}
-
-} // namespace Drill

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcEncoder.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcEncoder.hpp b/contrib/native/client/src/clientlib/rpcEncoder.hpp
deleted file mode 100644
index a4a7216..0000000
--- a/contrib/native/client/src/clientlib/rpcEncoder.hpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#ifndef RPC_ENCODER_H
-#define RPC_ENCODER_H
-
-#include "rpcMessage.hpp"
-
-namespace Drill {
-
-class RpcEncoder {
-    public:
-        RpcEncoder() {}
-        ~RpcEncoder() { }
-        bool Encode(DataBuf& buf,OutBoundRpcMessage& msg);
-        static const uint32_t HEADER_TAG;
-        static const uint32_t PROTOBUF_BODY_TAG;
-        static const uint32_t RAW_BODY_TAG;
-        static const uint32_t HEADER_TAG_LENGTH;
-        static const uint32_t PROTOBUF_BODY_TAG_LENGTH;
-        static const uint32_t RAW_BODY_TAG_LENGTH;
-};
-
-// copy from java code
-inline int getRawVarintSize(uint32_t value) {
-    int count = 0;
-    while (true) {
-        if ((value & ~0x7F) == 0) {
-            count++;
-            return count;
-        } else {
-            count++;
-            value >>= 7;
-        }
-    }
-}
-
-} // namespace Drill
-#endif

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcMessage.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.cpp b/contrib/native/client/src/clientlib/rpcMessage.cpp
new file mode 100644
index 0000000..13cd7a8
--- /dev/null
+++ b/contrib/native/client/src/clientlib/rpcMessage.cpp
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/wire_format_lite.h>
+
+#include "drill/common.hpp"
+#include "rpcMessage.hpp"
+
+namespace Drill{
+namespace rpc {
+
+
+namespace {
+using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::CodedOutputStream;
+using exec::rpc::CompleteRpcMessage;
+
+static const uint32_t HEADER_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kHeaderFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
+static const uint32_t PROTOBUF_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kProtobufBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
+static const uint32_t RAW_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kRawBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
+static const uint32_t HEADER_TAG_LENGTH = CodedOutputStream::VarintSize32(HEADER_TAG);
+static const uint32_t PROTOBUF_BODY_TAG_LENGTH = CodedOutputStream::VarintSize32(PROTOBUF_BODY_TAG);
+}
+
+std::size_t lengthDecode(const uint8_t* buf, uint32_t& length) {
+    using google::protobuf::io::CodedInputStream;
+    using google::protobuf::io::CodedOutputStream;
+
+    // read the frame to get the length of the message and then
+
+    CodedInputStream cis(buf, 5); // read 5 bytes at most
+
+    int startPos(cis.CurrentPosition()); // for debugging
+    if (!cis.ReadVarint32(&length)) {
+    	return -1;
+    }
+
+    #ifdef CODER_DEBUG
+    std::cerr << "length = " << length << std::endl;
+    #endif
+
+    int endPos(cis.CurrentPosition());
+
+    assert((endPos-startPos) == CodedOutputStream::VarintSize32(length));
+    return (endPos-startPos);
+}
+
+// TODO: error handling
+//
+// - assume that the entire message is in the buffer and the buffer is constrained to this message
+// - easy to handle with raw array in C++
+bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) {
+    using google::protobuf::io::CodedInputStream;
+
+    CodedInputStream cis(buf, length);
+
+    int startPos(cis.CurrentPosition()); // for debugging
+
+    CodedInputStream::Limit len_limit(cis.PushLimit(length));
+
+    uint32_t header_length(0);
+
+    if (!cis.ExpectTag(HEADER_TAG)) {
+    	return false;
+    }
+
+    if (!cis.ReadVarint32(&header_length)) {
+    	return false;
+    }
+
+    #ifdef CODER_DEBUG
+    std::cerr << "Reading header length " << header_length << ", post read index " << cis.CurrentPosition() << std::endl;
+    #endif
+
+    exec::rpc::RpcHeader header;
+    CodedInputStream::Limit header_limit(cis.PushLimit(header_length));
+
+    if (!header.ParseFromCodedStream(&cis)) {
+    	return false;
+    }
+    cis.PopLimit(header_limit);
+
+    msg.m_has_mode = header.has_mode();
+    msg.m_mode = header.mode();
+    msg.m_coord_id = header.coordination_id();
+    msg.m_has_rpc_type = header.has_rpc_type();
+    msg.m_rpc_type = header.rpc_type();
+
+    // read the protobuf body into a buffer.
+    if (!cis.ExpectTag(PROTOBUF_BODY_TAG)) {
+    	return false;
+    }
+
+    uint32_t pbody_length(0);
+    if (!cis.ReadVarint32(&pbody_length)) {
+    	return false;
+    }
+
+    #ifdef CODER_DEBUG
+    std::cerr << "Reading protobuf body length " << pbody_length << ", post read index " << cis.CurrentPosition() << std::endl;
+    #endif
+
+    msg.m_pbody.resize(pbody_length);
+    if (!cis.ReadRaw(msg.m_pbody.data(), pbody_length)) {
+    	return false;
+    }
+
+    // read the data body.
+    if (cis.BytesUntilLimit() > 0 ) {
+		#ifdef CODER_DEBUG
+			std::cerr << "Reading raw body, buffer has "<< std::cis->BytesUntilLimit() << " bytes available, current possion "<< cis.CurrentPosition()  << endl;
+		#endif
+        if (!cis.ExpectTag(RAW_BODY_TAG)) {
+        	return false;
+        }
+
+        uint32_t dbody_length = 0;
+        if (!cis.ReadVarint32(&dbody_length)) {
+        	return false;
+        }
+
+        if(cis.BytesUntilLimit() != dbody_length) {
+			#ifdef CODER_DEBUG
+					cerr << "Expected to receive a raw body of " << dbody_length << " bytes but received a buffer with " <<cis->BytesUntilLimit() << " bytes." << endl;
+			#endif
+			return false;
+        }
+
+        int currPos(cis.CurrentPosition());
+        int size;
+        cis.GetDirectBufferPointer(const_cast<const void**>(reinterpret_cast<void**>(&msg.m_dbody)), &size);
+        cis.Skip(size);
+
+        assert(dbody_length == size);
+        assert(msg.m_dbody==buf+currPos);
+		#ifdef CODER_DEBUG
+			cerr << "Read raw body of " << dbody_length << " bytes" << endl;
+		#endif
+    } else {
+		#ifdef CODER_DEBUG
+			cerr << "No need to read raw body, no readable bytes left." << endl;
+		#endif
+    }
+    cis.PopLimit(len_limit);
+
+
+    // return the rpc message.
+    // move the reader index forward so the next rpc call won't try to work with it.
+    // buffer.skipBytes(dBodyLength);
+    // messageCounter.incrementAndGet();
+    #ifdef CODER_DEBUG
+    std::cerr << "Inbound Rpc Message Decoded " << msg << std::endl;
+    #endif
+
+    int endPos = cis.CurrentPosition();
+    assert((endPos-startPos) == length);
+    return true;
+}
+
+
+bool encode(DataBuf& buf, const OutBoundRpcMessage& msg) {
+    using exec::rpc::RpcHeader;
+    using google::protobuf::io::CodedOutputStream;
+    // Todo:
+    //
+    // - let a context manager to allocate a buffer `ByteBuf buf = ctx.alloc().buffer();`
+    // - builder pattern
+    //
+    #ifdef CODER_DEBUG
+    std::cerr << "Encoding outbound message " << msg << std::endl;
+    #endif
+
+    RpcHeader header;
+    header.set_mode(msg.m_mode);
+    header.set_coordination_id(msg.m_coord_id);
+    header.set_rpc_type(msg.m_rpc_type);
+
+    // calcute the length of the message
+    int header_length = header.ByteSize();
+    int proto_body_length = msg.m_pbody->ByteSize();
+    int full_length = HEADER_TAG_LENGTH + CodedOutputStream::VarintSize32(header_length) + header_length + \
+                      PROTOBUF_BODY_TAG_LENGTH + CodedOutputStream::VarintSize32(proto_body_length) + proto_body_length;
+
+    /*
+       if(raw_body_length > 0) {
+       full_length += (RAW_BODY_TAG_LENGTH + getRawVarintSize(raw_body_length) + raw_body_length);
+       }
+       */
+
+    buf.resize(full_length + CodedOutputStream::VarintSize32(full_length));
+
+    uint8_t* data = buf.data();
+
+    #ifdef CODER_DEBUG
+    std::cerr << "Writing full length " << full_length << std::endl;
+    #endif
+
+    data = CodedOutputStream::WriteVarint32ToArray(full_length, data);
+
+    #ifdef CODER_DEBUG
+    std::cerr << "Writing header length " << header_length << std::endl;
+    #endif
+
+    data = CodedOutputStream::WriteVarint32ToArray(HEADER_TAG, data);
+    data = CodedOutputStream::WriteVarint32ToArray(header_length, data);
+
+    data = header.SerializeWithCachedSizesToArray(data);
+
+    // write protobuf body length and body
+    #ifdef CODER_DEBUG
+    std::cerr << "Writing protobuf body length " << proto_body_length << std::endl;
+    #endif
+
+    data = CodedOutputStream::WriteVarint32ToArray(PROTOBUF_BODY_TAG, data);
+    data = CodedOutputStream::WriteVarint32ToArray(proto_body_length, data);
+    msg.m_pbody->SerializeWithCachedSizesToArray(data);
+
+    // Done! no read to write data body for client
+    return true;
+}
+} // namespace rpc
+} // namespace Drill

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcMessage.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.hpp b/contrib/native/client/src/clientlib/rpcMessage.hpp
index 6696971..15487e9 100644
--- a/contrib/native/client/src/clientlib/rpcMessage.hpp
+++ b/contrib/native/client/src/clientlib/rpcMessage.hpp
@@ -25,8 +25,8 @@
 #include "GeneralRPC.pb.h"
 
 namespace Drill {
-
-class InBoundRpcMessage {
+namespace rpc {
+struct InBoundRpcMessage {
     public:
         exec::rpc::RpcMode m_mode;
         int m_rpc_type;
@@ -39,7 +39,7 @@ class InBoundRpcMessage {
         bool has_rpc_type() { return m_has_rpc_type; };
 };
 
-class OutBoundRpcMessage {
+struct OutBoundRpcMessage {
     public:
         exec::rpc::RpcMode m_mode;
         int m_rpc_type;
@@ -49,6 +49,13 @@ class OutBoundRpcMessage {
             m_mode(mode), m_rpc_type(rpc_type), m_coord_id(coord_id), m_pbody(pbody) { }
 };
 
-}
+std::size_t lengthDecode(const uint8_t* buf, uint32_t& length);
+
+bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg);
+
+bool encode(DataBuf& buf, const OutBoundRpcMessage& msg);
+
+} // namespace rpc
+} // namespace Drill
 
 #endif

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/utils.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/utils.cpp b/contrib/native/client/src/clientlib/utils.cpp
index 1e6a877..d3c8f08 100644
--- a/contrib/native/client/src/clientlib/utils.cpp
+++ b/contrib/native/client/src/clientlib/utils.cpp
@@ -22,6 +22,13 @@
 #include "logger.hpp"
 #include "drill/common.hpp"
 
+#if defined _WIN32  || defined _WIN64
+//Windows header files redefine 'max'
+#ifdef max
+#undef max
+#endif
+#endif
+
 namespace Drill{
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/utils.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/utils.hpp b/contrib/native/client/src/clientlib/utils.hpp
index 3237aa3..4cd8fa5 100644
--- a/contrib/native/client/src/clientlib/utils.hpp
+++ b/contrib/native/client/src/clientlib/utils.hpp
@@ -31,7 +31,6 @@
     #undef random
   #endif
 #endif
-#include <boost/asio/deadline_timer.hpp>
 #include <boost/random/mersenne_twister.hpp> // for mt19937
 #include <boost/random/random_device.hpp>
 #include <boost/random/uniform_int.hpp>

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/y2038/time64.c
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/y2038/time64.c b/contrib/native/client/src/clientlib/y2038/time64.c
index e0d61c8..bbbabe2 100644
--- a/contrib/native/client/src/clientlib/y2038/time64.c
+++ b/contrib/native/client/src/clientlib/y2038/time64.c
@@ -110,15 +110,15 @@ static const int safe_years_low[SOLAR_CYCLE_LENGTH] = {
 };
 
 /* This isn't used, but it's handy to look at */
-static const char dow_year_start[SOLAR_CYCLE_LENGTH] = {
-    5, 0, 1, 2,     /* 0       2016 - 2019 */
-    3, 5, 6, 0,     /* 4  */
-    1, 3, 4, 5,     /* 8       1996 - 1998, 1971*/
-    6, 1, 2, 3,     /* 12      1972 - 1975 */
-    4, 6, 0, 1,     /* 16 */
-    2, 4, 5, 6,     /* 20      2036, 2037, 2010, 2011 */
-    0, 2, 3, 4      /* 24      2012, 2013, 2014, 2015 */
-};
+//static const char dow_year_start[SOLAR_CYCLE_LENGTH] = {
+//    5, 0, 1, 2,     /* 0       2016 - 2019 */
+//    3, 5, 6, 0,     /* 4  */
+//    1, 3, 4, 5,     /* 8       1996 - 1998, 1971*/
+//    6, 1, 2, 3,     /* 12      1972 - 1975 */
+//    4, 6, 0, 1,     /* 16 */
+//    2, 4, 5, 6,     /* 20      2036, 2037, 2010, 2011 */
+//    0, 2, 3, 4      /* 24      2012, 2013, 2014, 2015 */
+//};
 
 /* Let's assume people are going to be looking for dates in the future.
    Let's provide some cheats so you can skip ahead.

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/zookeeperClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/zookeeperClient.cpp b/contrib/native/client/src/clientlib/zookeeperClient.cpp
new file mode 100644
index 0000000..535bebc
--- /dev/null
+++ b/contrib/native/client/src/clientlib/zookeeperClient.cpp
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <boost/bind.hpp>
+#include <drill/drillClient.hpp>
+#include "zookeeperClient.hpp"
+
+#include "errmsgs.hpp"
+#include "logger.hpp"
+
+namespace Drill {
+std::string ZookeeperClient::s_defaultDrillPath("/drill/drillbits1");
+static void watcherCallback(zhandle_t *zzh, int type, int state, const char *path, void* context) {
+	static_cast<ZookeeperClient*>(context)->watcher(zzh, type, state, path, context);
+}
+
+ZookeeperClient::ZookeeperClient(const std::string& drillPath)
+: p_zh(), m_state(), m_path(drillPath) {
+    m_bConnecting=true;
+    memset(&m_id, 0, sizeof(m_id));
+}
+
+ZookeeperClient::~ZookeeperClient(){
+}
+
+ZooLogLevel ZookeeperClient::getZkLogLevel(){
+    //typedef enum {ZOO_LOG_LEVEL_ERROR=1,
+    //    ZOO_LOG_LEVEL_WARN=2,
+    //    ZOO_LOG_LEVEL_INFO=3,
+    //    ZOO_LOG_LEVEL_DEBUG=4
+    //} ZooLogLevel;
+    switch(DrillClientConfig::getLogLevel()){
+        case LOG_TRACE:
+        case LOG_DEBUG:
+            return ZOO_LOG_LEVEL_DEBUG;
+        case LOG_INFO:
+            return ZOO_LOG_LEVEL_INFO;
+        case LOG_WARNING:
+            return ZOO_LOG_LEVEL_WARN;
+        case LOG_ERROR:
+        case LOG_FATAL:
+        default:
+            return ZOO_LOG_LEVEL_ERROR;
+    }
+    return ZOO_LOG_LEVEL_ERROR;
+}
+
+void ZookeeperClient::watcher(zhandle_t *zzh, int type, int state, const char *path, void*) {
+    //From cli.c
+
+    /* Be careful using zh here rather than zzh - as this may be mt code
+     * the client lib may call the watcher before zookeeper_init returns */
+
+    this->m_state=state;
+    if (type == ZOO_SESSION_EVENT) {
+        if (state == ZOO_CONNECTED_STATE) {
+        } else if (state == ZOO_AUTH_FAILED_STATE) {
+            this->m_err= getMessage(ERR_CONN_ZKNOAUTH);
+            this->close();
+        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
+        	this->m_err= getMessage(ERR_CONN_ZKEXP);
+        	this->close();
+        }
+    }
+    // signal the cond var
+    {
+        if (state == ZOO_CONNECTED_STATE){
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;)
+        }
+        boost::lock_guard<boost::mutex> bufferLock(this->m_cvMutex);
+        this->m_bConnecting=false;
+    }
+    this->m_cv.notify_one();
+}
+
+int ZookeeperClient::getAllDrillbits(const std::string& connectStr, std::vector<std::string>& drillbits){
+    uint32_t waitTime=30000; // 10 seconds
+    zoo_set_debug_level(getZkLogLevel());
+    zoo_deterministic_conn_order(1); // enable deterministic order
+
+    p_zh = boost::shared_ptr<zhandle_t>(zookeeper_init(connectStr.c_str(), &watcherCallback, waitTime, &m_id, this, 0), zookeeper_close);
+    if(!p_zh) {
+        m_err = getMessage(ERR_CONN_ZKFAIL);
+        return -1;
+    }
+
+    m_err="";
+	//Wait for the completion handler to signal successful connection
+	boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
+	boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime);
+	while(this->m_bConnecting) {
+		if(!this->m_cv.timed_wait(bufferLock, timeout)){
+			m_err = getMessage(ERR_CONN_ZKTIMOUT);
+			return -1;
+		}
+	}
+
+    if(m_state!=ZOO_CONNECTED_STATE){
+        return -1;
+    }
+
+    int rc = ZOK;
+
+    struct String_vector drillbitsVector;
+    rc=zoo_get_children(p_zh.get(), m_path.c_str(), 0, &drillbitsVector);
+    if(rc!=ZOK){
+        m_err=getMessage(ERR_CONN_ZKERR, rc);
+        p_zh.reset();
+        return -1;
+    }
+
+    // Make sure we deallocate drillbitsVector properly when we exit
+    boost::shared_ptr<String_vector> guard(&drillbitsVector, deallocate_String_vector);
+
+    if(drillbitsVector.count > 0){
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << drillbitsVector.count << " drillbits in cluster ("
+                << connectStr << "/" << m_path
+                << ")." <<std::endl;)
+		for(int i=0; i<drillbitsVector.count; i++){
+			drillbits.push_back(drillbitsVector.data[i]);
+		}
+        for(int i=0; i<drillbits.size(); i++){
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;)
+        }
+    }
+    return 0;
+}
+
+int ZookeeperClient::getEndPoint(const std::string& drillbit, exec::DrillbitEndpoint& endpoint){
+    int rc = ZOK;
+	// pick the drillbit at 'index'
+	std::string s(m_path +  "/" + drillbit);
+	int buffer_len=MAX_CONNECT_STR;
+	char buffer[MAX_CONNECT_STR+1];
+	struct Stat stat;
+	buffer[MAX_CONNECT_STR]=0;
+	rc= zoo_get(p_zh.get(), s.c_str(), 0, buffer,  &buffer_len, &stat);
+	if(rc!=ZOK){
+		m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
+		return -1;
+	}
+	exec::DrillServiceInstance drillServiceInstance;
+	drillServiceInstance.ParseFromArray(buffer, buffer_len);
+	endpoint=drillServiceInstance.endpoint();
+
+    return 0;
+}
+
+void ZookeeperClient::close(){
+	p_zh.reset();
+}
+
+} /* namespace Drill */

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/zookeeperClient.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/zookeeperClient.hpp b/contrib/native/client/src/clientlib/zookeeperClient.hpp
new file mode 100644
index 0000000..25d6af5
--- /dev/null
+++ b/contrib/native/client/src/clientlib/zookeeperClient.hpp
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef _WIN32
+#include <zookeeper.h>
+#else
+#include <zookeeper/zookeeper.h>
+#endif
+
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "UserBitShared.pb.h"
+
+
+#ifndef ZOOKEEPER_CLIENT_H
+#define ZOOKEEPER_CLIENT_H
+
+namespace Drill {
+class ZookeeperClient{
+    public:
+		static std::string s_defaultDrillPath;
+
+        ZookeeperClient(const std::string& drillPath = s_defaultDrillPath);
+        ~ZookeeperClient();
+        static ZooLogLevel getZkLogLevel();
+        // comma separated host:port pairs, each corresponding to a zk
+        // server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
+        void close();
+        const std::string& getError() const{return m_err;}
+        // return unshuffled list of drillbits
+        int getAllDrillbits(const std::string& connectStr, std::vector<std::string>& drillbits);
+        // picks the index drillbit and returns the corresponding endpoint object
+        int getEndPoint(const std::string& drillbit, exec::DrillbitEndpoint& endpoint);
+
+        void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context);
+
+    private:
+        boost::shared_ptr<zhandle_t> p_zh;
+        clientid_t m_id;
+        int m_state;
+        std::string m_err;
+
+        boost::mutex m_cvMutex;
+        // Condition variable to signal connection callback has been processed
+        boost::condition_variable m_cv;
+        bool m_bConnecting;
+        std::string m_path;
+
+};
+} /* namespace Drill */
+
+
+
+#endif /* ZOOKEEPER_H */

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/include/drill/collections.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/collections.hpp b/contrib/native/client/src/include/drill/collections.hpp
new file mode 100644
index 0000000..9fbfcc5
--- /dev/null
+++ b/contrib/native/client/src/include/drill/collections.hpp
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DRILL_COLLECTIONS_H
+#define _DRILL_COLLECTIONS_H
+
+#include <iterator>
+
+#include <boost/noncopyable.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace Drill {
+namespace impl {
+
+/**
+ * Interface for internal iterators
+ */
+template<typename T>
+class DrillIteratorImpl: private boost::noncopyable {
+public:
+	typedef DrillIteratorImpl<T> iterator;
+	typedef boost::shared_ptr<iterator> iterator_ptr;
+
+	typedef T value_type;
+	typedef value_type& reference;
+	typedef value_type* pointer;
+
+	virtual ~DrillIteratorImpl() {};
+
+	// To allow conversion from non-const to const types
+	virtual operator typename DrillIteratorImpl<const T>::iterator_ptr() const = 0;
+
+	virtual reference operator*() const = 0;
+	virtual pointer   operator->() const = 0;
+
+	virtual iterator& operator++() = 0;
+
+	virtual bool operator==(const iterator& x) const = 0;
+	virtual bool operator!=(const iterator& x) const = 0;
+};
+
+/**
+ * Interface for internal collections
+ */
+template<typename T>
+class DrillCollectionImpl: private boost::noncopyable {
+public:
+	// STL-like iterator typedef
+	typedef DrillIteratorImpl<T> iterator;
+	typedef boost::shared_ptr<iterator> iterator_ptr;
+	typedef DrillIteratorImpl<const T> const_iterator;
+	typedef boost::shared_ptr<const_iterator> const_iterator_ptr;
+
+	typedef T value_type;
+	typedef value_type& reference;
+	typedef const value_type& const_reference;
+	typedef value_type* pointer;
+	typedef const value_type* const_pointer;
+	typedef int size_type;
+
+	virtual ~DrillCollectionImpl() {}
+
+	virtual iterator_ptr begin() = 0;
+	virtual const_iterator_ptr begin() const = 0;
+	virtual iterator_ptr end() = 0;
+	virtual const_iterator_ptr end() const = 0;
+};
+} // namespace internal
+
+template<typename T>
+class DrillCollection;
+
+template<typename T>
+class DrillIterator: public std::iterator<std::input_iterator_tag, T> {
+public:
+	typedef impl::DrillIteratorImpl<T> Impl;
+	typedef boost::shared_ptr<Impl> ImplPtr;
+
+	typedef DrillIterator<T> iterator;
+	typedef std::iterator<std::input_iterator_tag, T> superclass;
+	typedef typename superclass::reference reference;
+	typedef typename superclass::pointer pointer;
+
+	// Default constructor
+	DrillIterator(): m_pImpl() {};
+	~DrillIterator() {}
+
+	// Iterators are CopyConstructible and CopyAssignable
+	DrillIterator(const iterator& it): m_pImpl(it.m_pImpl) {}
+	iterator& operator=(const iterator& it) {
+		m_pImpl = it.m_pImpl;
+		return *this;
+	}
+
+	template<typename U>
+	DrillIterator(const DrillIterator<U>& it): m_pImpl(*it.m_pImpl) {}
+
+	reference operator*() const { return m_pImpl->operator*(); }
+	pointer   operator->() const { return m_pImpl->operator->(); }
+
+	iterator& operator++() { m_pImpl->operator++(); return *this; }
+
+	bool operator==(const iterator& x) const { 
+		if (m_pImpl == x.m_pImpl) {
+			return true;
+		}
+		return m_pImpl && m_pImpl->operator==(*x.m_pImpl);
+	}
+
+	bool operator!=(const iterator& x) const { 
+		if (m_pImpl == x.m_pImpl) {
+			return false;
+		}
+		return !m_pImpl ||  m_pImpl->operator!=(*x.m_pImpl);
+	}
+
+private:
+	template<typename U>
+	friend class DrillCollection;
+	template<typename U>
+	friend class DrillIterator;
+
+	ImplPtr m_pImpl;
+
+	template<typename U>
+	DrillIterator(const boost::shared_ptr<impl::DrillIteratorImpl<U> >& pImpl): m_pImpl(pImpl) {}
+};
+
+template<typename T>
+class DrillCollection {
+public:
+	typedef impl::DrillCollectionImpl<T> Impl;
+	typedef boost::shared_ptr<Impl> ImplPtr;
+
+	// STL-like iterator typedef
+	typedef DrillIterator<T> iterator;
+	typedef DrillIterator<const T> const_iterator;
+	typedef T value_type;
+	typedef value_type& reference;
+	typedef const value_type& const_reference;
+	typedef value_type* pointer;
+	typedef const value_type* const_pointer;
+	typedef int size_type;
+
+	iterator       begin()       { return iterator(m_pImpl->begin()); }
+	const_iterator begin() const { return const_iterator(boost::const_pointer_cast<const Impl>(m_pImpl)->begin()); }
+	iterator       end()         { return iterator(m_pImpl->end()); }
+	const_iterator end() const   { return const_iterator(boost::const_pointer_cast<const Impl>(m_pImpl)->end()); }
+
+protected:
+	DrillCollection(const ImplPtr& impl): m_pImpl(impl) {}
+
+	Impl& operator*() { return *m_pImpl; }
+	const Impl& operator*() const { return *m_pImpl; }
+	Impl* operator->() { return m_pImpl.get(); }
+	const Impl* operator->() const { return m_pImpl.get(); }
+
+private:
+	ImplPtr m_pImpl;
+};
+
+
+} /* namespace Drill */
+#endif /* _DRILL_COLLECTIONS_H */

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/include/drill/common.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index a617dc7..6d3816e 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -20,6 +20,24 @@
 #ifndef _COMMON_H_
 #define _COMMON_H_
 
+#if defined _WIN32 || defined __CYGWIN__
+  #ifdef DRILL_CLIENT_EXPORTS
+      #define DECLSPEC_DRILL_CLIENT __declspec(dllexport)
+  #else
+    #ifdef USE_STATIC_LIBDRILL
+      #define DECLSPEC_DRILL_CLIENT
+    #else
+      #define DECLSPEC_DRILL_CLIENT  __declspec(dllimport)
+    #endif
+  #endif
+#else
+  #if __GNUC__ >= 4
+    #define DECLSPEC_DRILL_CLIENT __attribute__ ((visibility ("default")))
+  #else
+    #define DECLSPEC_DRILL_CLIENT
+  #endif
+#endif
+
 #ifdef _WIN32
 // The order of inclusion is important. Including winsock2 before everything else
 // ensures that the correct typedefs are defined and that the older typedefs defined


Mime
View raw message