hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [51/56] [abbrv] hadoop git commit: HDFS-8775. SASL support for data transfer protocol in libhdfspp. Contributed by Haohui Mai.
Date Wed, 07 Oct 2015 07:20:17 GMT
HDFS-8775. SASL support for data transfer protocol in libhdfspp. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3830732e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3830732e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3830732e

Branch: refs/heads/HDFS-8707
Commit: 3830732eb354240d4ef17a49051370f742e6716e
Parents: 319476e
Author: Haohui Mai <wheat9@apache.org>
Authored: Mon Jul 20 14:31:01 2015 -0700
Committer: Haohui Mai <wheat9@apache.org>
Committed: Wed Oct 7 00:17:12 2015 -0700

----------------------------------------------------------------------
 .../native/libhdfspp/lib/common/CMakeLists.txt  |   2 +-
 .../lib/common/continuation/protobuf.h          |   2 +
 .../libhdfspp/lib/common/sasl_authenticator.h   |  66 +++++
 .../libhdfspp/lib/common/sasl_digest_md5.cc     | 240 +++++++++++++++++++
 .../native/libhdfspp/lib/reader/CMakeLists.txt  |   2 +-
 .../native/libhdfspp/lib/reader/datatransfer.cc |  52 ++++
 .../native/libhdfspp/lib/reader/datatransfer.h  |  34 ++-
 .../libhdfspp/lib/reader/datatransfer_impl.h    | 144 +++++++++++
 .../main/native/libhdfspp/tests/CMakeLists.txt  |   7 +-
 .../libhdfspp/tests/remote_block_reader_test.cc |  52 +++-
 .../libhdfspp/tests/sasl_digest_md5_test.cc     |  44 ++++
 11 files changed, 635 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
index 570d0ac..cea5e0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -1 +1 @@
-add_library(common base64.cc status.cc)
+add_library(common base64.cc status.cc sasl_digest_md5.cc)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
index 3e4b535..d30322c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
@@ -20,6 +20,8 @@
 
 #include "common/util.h"
 
+#include <asio/read.hpp>
+
 #include <google/protobuf/message_lite.h>
 #include <google/protobuf/io/coded_stream.h>
 #include <google/protobuf/io/zero_copy_stream_impl_lite.h>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h
new file mode 100644
index 0000000..71fee7a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_authenticator.h
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_SASL_AUTHENTICATOR_H_
+#define LIB_COMMON_SASL_AUTHENTICATOR_H_
+
+#include "libhdfspp/status.h"
+
+namespace hdfs {
+
+class DigestMD5AuthenticatorTest_TestResponse_Test;
+
+/**
+ * A specialized implementation of RFC 2831 for the HDFS
+ * DataTransferProtocol.
+ *
+ * The current lacks the following features:
+ *   * Encoding the username, realm, and password in ISO-8859-1 when
+ * it is required by the RFC. They are always encoded in UTF-8.
+ *   * Checking whether the challenges from the server are
+ * well-formed.
+ *   * Specifying authzid, digest-uri and maximum buffer size.
+ *   * Supporting QOP other than the auth level.
+ **/
+class DigestMD5Authenticator {
+public:
+  Status EvaluateResponse(const std::string &payload, std::string *result);
+  DigestMD5Authenticator(const std::string &username,
+                         const std::string &password, bool mock_nonce = false);
+
+private:
+  Status GenerateFirstResponse(std::string *result);
+  Status GenerateResponseValue(std::string *response_value);
+  Status ParseFirstChallenge(const std::string &payload);
+
+  static size_t NextToken(const std::string &payload, size_t off,
+                          std::string *tok);
+  void GenerateCNonce();
+  std::string username_;
+  std::string password_;
+  std::string nonce_;
+  std::string cnonce_;
+  std::string realm_;
+  std::string qop_;
+  unsigned nonce_count_;
+
+  const bool TEST_mock_cnonce_;
+  friend class DigestMD5AuthenticatorTest_TestResponse_Test;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc
new file mode 100644
index 0000000..6f53e28
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/sasl_digest_md5.cc
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "sasl_authenticator.h"
+
+#include "common/util.h"
+
+#include <openssl/rand.h>
+#include <openssl/md5.h>
+
+#include <iomanip>
+#include <map>
+#include <sstream>
+
+namespace hdfs {
+
+static std::string QuoteString(const std::string &src);
+static std::string GetMD5Digest(const std::string &src);
+static std::string BinaryToHex(const std::string &src);
+
+static const char kDigestUri[] = "hdfs/0";
+static const size_t kMaxBufferSize = 65536;
+
+DigestMD5Authenticator::DigestMD5Authenticator(const std::string &username,
+                                               const std::string &password,
+                                               bool mock_nonce)
+    : username_(username), password_(password), nonce_count_(0),
+      TEST_mock_cnonce_(mock_nonce) {}
+
+Status DigestMD5Authenticator::EvaluateResponse(const std::string &payload,
+                                                std::string *result) {
+  Status status = ParseFirstChallenge(payload);
+  if (status.ok()) {
+    status = GenerateFirstResponse(result);
+  }
+  return status;
+}
+
+size_t DigestMD5Authenticator::NextToken(const std::string &payload, size_t off,
+                                         std::string *tok) {
+  tok->clear();
+  if (off >= payload.size()) {
+    return std::string::npos;
+  }
+
+  char c = payload[off];
+  if (c == '=' || c == ',') {
+    *tok = c;
+    return off + 1;
+  }
+
+  int quote_count = 0;
+  for (; off < payload.size(); ++off) {
+    char c = payload[off];
+    if (c == '"') {
+      ++quote_count;
+      if (quote_count == 2) {
+        return off + 1;
+      }
+      continue;
+    }
+
+    if (c == '=') {
+      if (quote_count) {
+        tok->append(&c, 1);
+      } else {
+        break;
+      }
+    } else if (('0' <= c && c <= '9') || ('a' <= c && c <= 'z')
||
+               ('A' <= c && c <= 'Z') || c == '+' || c == '/' || c == '-' ||
+               c == '_' || c == '@') {
+      tok->append(&c, 1);
+    } else {
+      break;
+    }
+  }
+  return off;
+}
+
+void DigestMD5Authenticator::GenerateCNonce() {
+  if (!TEST_mock_cnonce_) {
+    char buf[8];
+    RAND_pseudo_bytes(reinterpret_cast<unsigned char *>(buf), sizeof(buf));
+    cnonce_ = Base64Encode(std::string(buf, sizeof(buf)));
+  }
+}
+
+Status DigestMD5Authenticator::ParseFirstChallenge(const std::string &payload) {
+  std::map<std::string, std::string> props;
+  std::string token;
+  enum {
+    kStateLVal,
+    kStateEqual,
+    kStateRVal,
+    kStateCommaOrEnd,
+  };
+
+  int state = kStateLVal;
+
+  std::string lval, rval;
+  size_t off = 0;
+  while (true) {
+    off = NextToken(payload, off, &token);
+    if (off == std::string::npos) {
+      break;
+    }
+
+    switch (state) {
+    case kStateLVal:
+      lval = token;
+      state = kStateEqual;
+      break;
+    case kStateEqual:
+      state = kStateRVal;
+      break;
+    case kStateRVal:
+      rval = token;
+      props[lval] = rval;
+      state = kStateCommaOrEnd;
+      break;
+    case kStateCommaOrEnd:
+      state = kStateLVal;
+      break;
+    }
+  }
+
+  if (props["algorithm"] != "md5-sess" || props["charset"] != "utf-8" ||
+      props.find("nonce") == props.end()) {
+    return Status::Error("Invalid challenge");
+  }
+  realm_ = props["realm"];
+  nonce_ = props["nonce"];
+  qop_ = props["qop"];
+  return Status::OK();
+}
+
+Status DigestMD5Authenticator::GenerateFirstResponse(std::string *result) {
+  // TODO: Support auth-int and auth-conf
+  // Handle cipher
+  if (qop_ != "auth") {
+    return Status::Unimplemented();
+  }
+
+  std::stringstream ss;
+  GenerateCNonce();
+  ss << "charset=utf-8,username=\"" << QuoteString(username_) << "\""
+     << ",authzid=\"" << QuoteString(username_) << "\""
+     << ",nonce=\"" << QuoteString(nonce_) << "\""
+     << ",digest-uri=\"" << kDigestUri << "\""
+     << ",maxbuf=" << kMaxBufferSize << ",cnonce=\"" << cnonce_ <<
"\"";
+
+  if (realm_.size()) {
+    ss << ",realm=\"" << QuoteString(realm_) << "\"";
+  }
+
+  ss << ",nc=" << std::hex << std::setw(8) << std::setfill('0')
+     << ++nonce_count_;
+  std::string response_value;
+  GenerateResponseValue(&response_value);
+  ss << ",response=" << response_value;
+  *result = ss.str();
+  return result->size() > 4096 ? Status::Error("Response too big")
+                               : Status::OK();
+}
+
+/**
+ * Generate the response value specified in S 2.1.2.1 in RFC2831.
+ **/
+Status
+DigestMD5Authenticator::GenerateResponseValue(std::string *response_value) {
+  std::stringstream begin_a1, a1_ss;
+  std::string a1, a2;
+
+  if (qop_ == "auth") {
+    a2 = std::string("AUTHENTICATE:") + kDigestUri;
+  } else {
+    a2 = std::string("AUTHENTICATE:") + kDigestUri +
+         ":00000000000000000000000000000000";
+  }
+
+  begin_a1 << username_ << ":" << realm_ << ":" << password_;
+  a1_ss << GetMD5Digest(begin_a1.str()) << ":" << nonce_ << ":" <<
cnonce_
+        << ":" << username_;
+
+  std::stringstream combine_ss;
+  combine_ss << BinaryToHex(GetMD5Digest(a1_ss.str())) << ":" << nonce_
<< ":"
+             << std::hex << std::setw(8) << std::setfill('0') <<
nonce_count_
+             << ":" << cnonce_ << ":" << qop_ << ":"
+             << BinaryToHex(GetMD5Digest(a2));
+  *response_value = BinaryToHex(GetMD5Digest(combine_ss.str()));
+  return Status::OK();
+}
+
+static std::string QuoteString(const std::string &src) {
+  std::string dst;
+  dst.resize(2 * src.size());
+  size_t j = 0;
+  for (size_t i = 0; i < src.size(); ++i) {
+    if (src[i] == '"') {
+      dst[j++] = '\\';
+    }
+    dst[j++] = src[i];
+  }
+  dst.resize(j);
+  return dst;
+}
+
+static std::string GetMD5Digest(const std::string &src) {
+  MD5_CTX ctx;
+  unsigned long long res[2];
+  MD5_Init(&ctx);
+  MD5_Update(&ctx, src.c_str(), src.size());
+  MD5_Final(reinterpret_cast<unsigned char *>(res), &ctx);
+  return std::string(reinterpret_cast<char *>(res), sizeof(res));
+}
+
+static std::string BinaryToHex(const std::string &src) {
+  std::stringstream ss;
+  ss << std::hex << std::setfill('0');
+  for (size_t i = 0; i < src.size(); ++i) {
+    unsigned c = (unsigned)(static_cast<unsigned char>(src[i]));
+    ss << std::setw(2) << c;
+  }
+  return ss.str();
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
index 65ec108..71e28ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
@@ -16,5 +16,5 @@
 # limitations under the License.
 #
 
-add_library(reader remote_block_reader.cc)
+add_library(reader remote_block_reader.cc datatransfer.cc)
 add_dependencies(reader proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
new file mode 100644
index 0000000..d936407
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "datatransfer.h"
+
+#include "libhdfspp/status.h"
+
+namespace hdfs {
+
+namespace DataTransferSaslStreamUtil {
+
+static const auto kSUCCESS = hadoop::hdfs::DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS;
+
+using hadoop::hdfs::DataTransferEncryptorMessageProto;
+
+Status ConvertToStatus(const DataTransferEncryptorMessageProto *msg, std::string *payload)
{
+  using namespace hadoop::hdfs;
+  auto s = msg->status();
+  if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR_UNKNOWN_KEY)
{
+    payload->clear();
+    return Status::Exception("InvalidEncryptionKeyException", msg->message().c_str());
+  } else if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR) {
+    payload->clear();
+    return Status::Error(msg->message().c_str());
+  } else {
+    *payload = msg->payload();
+    return Status::OK();
+  }
+}
+
+void PrepareInitialHandshake(DataTransferEncryptorMessageProto *msg) {
+  msg->set_status(kSUCCESS);
+  msg->set_payload("");
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
index d22f5e8..511c2eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
@@ -15,8 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef COMMON_DATA_TRANSFER_H_
-#define COMMON_DATA_TRANSFER_H_
+#ifndef LIB_READER_DATA_TRANSFER_H_
+#define LIB_READER_DATA_TRANSFER_H_
+
+#include "common/sasl_authenticator.h"
 
 namespace hdfs {
 
@@ -27,9 +29,35 @@ enum {
 
 enum Operation {
   kWriteBlock = 80,
-  kReadBlock  = 81,
+  kReadBlock = 81,
 };
 
+template <class Stream> class DataTransferSaslStream {
+public:
+  DataTransferSaslStream(Stream *stream, const std::string &username,
+                         const std::string &password)
+      : stream_(stream), authenticator_(username, password) {}
+
+  template <class Handler> void Handshake(const Handler &next);
+
+  template <class MutableBufferSequence, class ReadHandler>
+  void async_read_some(const MutableBufferSequence &buffers,
+                       ReadHandler &&handler);
+
+  template <class ConstBufferSequence, class WriteHandler>
+  void async_write_some(const ConstBufferSequence &buffers,
+                        WriteHandler &&handler);
+
+private:
+  DataTransferSaslStream(const DataTransferSaslStream &) = delete;
+  DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete;
+  Stream *stream_;
+  DigestMD5Authenticator authenticator_;
+  struct ReadSaslMessage;
+  struct Authenticator;
+};
 }
 
+#include "datatransfer_impl.h"
+
 #endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
new file mode 100644
index 0000000..088b86e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_READER_DATATRANFER_IMPL_H_
+#define LIB_READER_DATATRANFER_IMPL_H_
+
+#include "datatransfer.pb.h"
+#include "common/continuation/continuation.h"
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <asio/read.hpp>
+#include <asio/buffer.hpp>
+
+namespace hdfs {
+
+namespace DataTransferSaslStreamUtil {
+Status
+ConvertToStatus(const ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg,
+                std::string *payload);
+void PrepareInitialHandshake(
+    ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg);
+}
+
+template <class Stream>
+struct DataTransferSaslStream<Stream>::Authenticator
+    : continuation::Continuation {
+  Authenticator(DigestMD5Authenticator *authenticator,
+                const std::string *request,
+                hadoop::hdfs::DataTransferEncryptorMessageProto *msg)
+      : authenticator_(authenticator), request_(request), msg_(msg) {}
+
+  virtual void Run(const Next &next) override {
+    using namespace ::hadoop::hdfs;
+    std::string response;
+    Status status = authenticator_->EvaluateResponse(*request_, &response);
+    msg_->Clear();
+    if (status.ok()) {
+      // TODO: Handle encryption scheme
+      msg_->set_payload(response);
+      msg_->set_status(
+          DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
+    } else {
+      msg_->set_status(
+          DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR);
+    }
+    next(Status::OK());
+  }
+
+private:
+  DigestMD5Authenticator *authenticator_;
+  const std::string *request_;
+  hadoop::hdfs::DataTransferEncryptorMessageProto *msg_;
+};
+
+template <class Stream>
+struct DataTransferSaslStream<Stream>::ReadSaslMessage
+    : continuation::Continuation {
+  ReadSaslMessage(Stream *stream, std::string *data)
+      : stream_(stream), data_(data), read_pb_(stream, &resp_) {}
+
+  virtual void Run(const Next &next) override {
+    auto handler = [this, next](const Status &status) {
+      if (status.ok()) {
+        Status new_stat =
+            DataTransferSaslStreamUtil::ConvertToStatus(&resp_, data_);
+        next(new_stat);
+      } else {
+        next(status);
+      }
+    };
+    read_pb_.Run(handler);
+  }
+
+private:
+  Stream *stream_;
+  std::string *data_;
+  hadoop::hdfs::DataTransferEncryptorMessageProto resp_;
+  continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_;
+};
+
+template <class Stream>
+template <class Handler>
+void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
+  using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
+  using ::hdfs::continuation::Write;
+  using ::hdfs::continuation::WriteDelimitedPBMessage;
+
+  static const int kMagicNumber = htonl(kDataTransferSasl);
+  static const asio::const_buffers_1 kMagicNumberBuffer = asio::buffer(
+      reinterpret_cast<const char *>(kMagicNumber), sizeof(kMagicNumber));
+
+  struct State {
+    DataTransferEncryptorMessageProto req0;
+    std::string resp0;
+    DataTransferEncryptorMessageProto req1;
+    std::string resp1;
+    Stream *stream;
+  };
+  auto m = continuation::Pipeline<State>::Create();
+  State *s = &m->state();
+  s->stream = stream_;
+
+  DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0);
+
+  m->Push(Write(stream_, kMagicNumberBuffer))
+      .Push(WriteDelimitedPBMessage(stream_, &s->req0))
+      .Push(new ReadSaslMessage(stream_, &s->resp0))
+      .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1))
+      .Push(WriteDelimitedPBMessage(stream_, &s->req1))
+      .Push(new ReadSaslMessage(stream_, &s->resp1));
+  m->Run([next](const Status &status, const State &) { next(status); });
+}
+
+template <class Stream>
+template <class MutableBufferSequence, class ReadHandler>
+void DataTransferSaslStream<Stream>::async_read_some(
+    const MutableBufferSequence &buffers, ReadHandler &&handler) {
+  stream_->async_read_some(buffers, handler);
+}
+
+template <class Stream>
+template <typename ConstBufferSequence, typename WriteHandler>
+void DataTransferSaslStream<Stream>::async_write_some(
+    const ConstBufferSequence &buffers, WriteHandler &&handler) {
+  stream_->async_write_some(buffers, handler);
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt
index cd5e1b1..53f340d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -17,6 +17,11 @@
 #
 
 add_library(test_common OBJECT mock_connection.cc)
+
 add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>)
-target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES}
gmock_main)
+target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES} gmock_main)
 add_test(remote_block_reader remote_block_reader_test)
+
+add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc)
+target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main)
+add_test(sasl_digest_md5 sasl_digest_md5_test)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
index 92cbc8f..5307d39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
@@ -32,6 +32,7 @@ using namespace hdfs;
 using ::hadoop::common::TokenProto;
 using ::hadoop::hdfs::BlockOpResponseProto;
 using ::hadoop::hdfs::ChecksumProto;
+using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
 using ::hadoop::hdfs::ExtendedBlockProto;
 using ::hadoop::hdfs::PacketHeaderProto;
 using ::hadoop::hdfs::ReadOpChecksumInfoProto;
@@ -90,13 +91,14 @@ ProducePacket(const std::string &data, const std::string &checksum,
   return std::make_pair(error_code(), std::move(payload));
 }
 
-static std::shared_ptr<RemoteBlockReader<MockDNConnection>>
-ReadContent(MockDNConnection *conn, TokenProto *token,
+template<class Stream = MockDNConnection>
+static std::shared_ptr<RemoteBlockReader<Stream>>
+ReadContent(Stream *conn, TokenProto *token,
             const ExtendedBlockProto &block, uint64_t length, uint64_t offset,
             const mutable_buffers_1 &buf, Status *status, size_t *transferred) {
   BlockReaderOptions options;
   auto reader =
-      std::make_shared<RemoteBlockReader<MockDNConnection>>(options, conn);
+      std::make_shared<RemoteBlockReader<Stream>>(options, conn);
   Status result;
   reader->async_connect(
       "libhdfs++", token, &block, length, offset,
@@ -121,7 +123,6 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
   BlockOpResponseProto block_op_resp;
 
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
-
   EXPECT_CALL(conn, Produce())
       .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
       .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
@@ -205,6 +206,49 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   });
 }
 
+TEST(RemoteBlockReaderTest, TestSaslConnection) {
+  static const size_t kChunkSize = 512;
+  static const string kChunkData(kChunkSize, 'a');
+  static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
+                                     "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
+                                     "charset=utf-8,algorithm=md5-sess";
+  MockDNConnection conn;
+  BlockOpResponseProto block_op_resp;
+  block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
+
+  DataTransferEncryptorMessageProto sasl_resp0, sasl_resp1;
+  sasl_resp0.set_status(
+      ::hadoop::hdfs::
+          DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
+  sasl_resp0.set_payload(kAuthPayload);
+  sasl_resp1.set_status(
+      ::hadoop::hdfs::
+          DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
+
+  EXPECT_CALL(conn, Produce())
+      .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0))))
+      .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1))))
+      .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
+      .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
+
+  DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar");
+  ExtendedBlockProto block;
+  std::string data(kChunkSize, 0);
+  size_t transferred = 0;
+  Status stat;
+  sasl_conn.Handshake([&stat](const Status &s) {
+      stat = s;
+    });
+
+  ASSERT_TRUE(stat.ok());
+  ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0,
+              buffer(const_cast<char *>(data.c_str()), data.size()), &stat,
+              &transferred);
+  ASSERT_TRUE(stat.ok());
+  ASSERT_EQ(kChunkSize, transferred);
+  ASSERT_EQ(kChunkData, data);
+}
+
 int main(int argc, char *argv[]) {
   // The following line must be executed to initialize Google Mock
   // (and Google Test) before running the tests.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3830732e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc
new file mode 100644
index 0000000..0797853
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "common/sasl_authenticator.h"
+
+#include <gtest/gtest.h>
+
+namespace hdfs {
+
+/**
+ * Testing whether the authenticator generates the MD5 digest correctly.
+ **/
+TEST(DigestMD5AuthenticatorTest, TestResponse) {
+  const std::string username = "igFLnEx4OIx5PZWHAAAABGhtYWkAAAAoQlAtMTM3MDQ2OTk"
+                               "zLTE5Mi4xNjguMS4yMjctMTQyNDIyMDM4MTM2M4xAAAABAQ"
+                               "RSRUFE";
+  const std::string password = "K5IFUibAynVVrApeCXLrBk9Sro8=";
+  DigestMD5Authenticator auth(username, password, true);
+  auth.cnonce_ = "KQlJwBDTseCHpAkFLZls4WcAktp6r5wTzje5feLY";
+  std::string result;
+  Status status =
+      auth.EvaluateResponse("realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
+                            "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\",charset="
+                            "utf-8,algorithm=md5-sess",
+                            &result);
+  ASSERT_TRUE(status.ok());
+  ASSERT_TRUE(result.find("response=3a286c2c385b92a06ebc66d58b8c4330") !=
+              std::string::npos);
+}
+}


Mime
View raw message