hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [2/3] hadoop git commit: HDFS-9002. Move o.a.h.hdfs.net/*Peer classes to hdfs-client. Contributed by Mingliang Liu.
Date Thu, 03 Sep 2015 22:35:06 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index d921507..1e561cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocolPB;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -264,4 +266,104 @@ public class PBHelperClient {
     assert size >= 0;
     return new ExactSizeInputStream(input, size);
   }
+
+  public static CipherOption convert(HdfsProtos.CipherOptionProto proto) {
+    if (proto != null) {
+      CipherSuite suite = null;
+      if (proto.getSuite() != null) {
+        suite = convert(proto.getSuite());
+      }
+      byte[] inKey = null;
+      if (proto.getInKey() != null) {
+        inKey = proto.getInKey().toByteArray();
+      }
+      byte[] inIv = null;
+      if (proto.getInIv() != null) {
+        inIv = proto.getInIv().toByteArray();
+      }
+      byte[] outKey = null;
+      if (proto.getOutKey() != null) {
+        outKey = proto.getOutKey().toByteArray();
+      }
+      byte[] outIv = null;
+      if (proto.getOutIv() != null) {
+        outIv = proto.getOutIv().toByteArray();
+      }
+      return new CipherOption(suite, inKey, inIv, outKey, outIv);
+    }
+    return null;
+  }
+
+  public static CipherSuite convert(HdfsProtos.CipherSuiteProto proto) {
+    switch (proto) {
+    case AES_CTR_NOPADDING:
+      return CipherSuite.AES_CTR_NOPADDING;
+    default:
+      // Set to UNKNOWN and stash the unknown enum value
+      CipherSuite suite = CipherSuite.UNKNOWN;
+      suite.setUnknownValue(proto.getNumber());
+      return suite;
+    }
+  }
+
+  public static HdfsProtos.CipherOptionProto convert(CipherOption option) {
+    if (option != null) {
+      HdfsProtos.CipherOptionProto.Builder builder = HdfsProtos.CipherOptionProto.
+          newBuilder();
+      if (option.getCipherSuite() != null) {
+        builder.setSuite(convert(option.getCipherSuite()));
+      }
+      if (option.getInKey() != null) {
+        builder.setInKey(ByteString.copyFrom(option.getInKey()));
+      }
+      if (option.getInIv() != null) {
+        builder.setInIv(ByteString.copyFrom(option.getInIv()));
+      }
+      if (option.getOutKey() != null) {
+        builder.setOutKey(ByteString.copyFrom(option.getOutKey()));
+      }
+      if (option.getOutIv() != null) {
+        builder.setOutIv(ByteString.copyFrom(option.getOutIv()));
+      }
+      return builder.build();
+    }
+    return null;
+  }
+
+  public static HdfsProtos.CipherSuiteProto convert(CipherSuite suite) {
+    switch (suite) {
+    case UNKNOWN:
+      return HdfsProtos.CipherSuiteProto.UNKNOWN;
+    case AES_CTR_NOPADDING:
+      return HdfsProtos.CipherSuiteProto.AES_CTR_NOPADDING;
+    default:
+      return null;
+    }
+  }
+
+  public static List<HdfsProtos.CipherOptionProto> convertCipherOptions(
+      List<CipherOption> options) {
+    if (options != null) {
+      List<HdfsProtos.CipherOptionProto> protos =
+          Lists.newArrayListWithCapacity(options.size());
+      for (CipherOption option : options) {
+        protos.add(convert(option));
+      }
+      return protos;
+    }
+    return null;
+  }
+
+  public static List<CipherOption> convertCipherOptionProtos(
+      List<HdfsProtos.CipherOptionProto> protos) {
+    if (protos != null) {
+      List<CipherOption> options =
+          Lists.newArrayListWithCapacity(protos.size());
+      for (HdfsProtos.CipherOptionProto proto : protos) {
+        options.add(convert(proto));
+      }
+      return options;
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index ecc65cc..077fa98 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -538,6 +538,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8890. Allow admin to specify which blockpools the balancer should run
     on. (Chris Trezzo via mingma)
 
+    HDFS-9002. Move o.a.h.hdfs.net/*Peer classes to hdfs-client.
+    (Mingliang Liu via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 5403955..69a8532 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -103,7 +103,6 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -3077,7 +3076,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     try {
       sock = socketFactory.createSocket();
       NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout);
-      peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
+      peer = DFSUtilClient.peerFromSocketAndKey(saslClient, sock, this,
           blockToken, datanodeId);
       peer.setReadTimeout(socketTimeout);
       peer.setWriteTimeout(socketTimeout);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index cf9c11d..545ca6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -597,14 +597,28 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   // Security-related configs
   public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
-  public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = "dfs.encrypt.data.transfer.cipher.key.bitlength";
-  public static final int    DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;
-  public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites";
+  @Deprecated
+  public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY =
+      HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
+  @Deprecated
+  public static final int    DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT =
+      HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
+  @Deprecated
+  public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
+      HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
-  public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
-  public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
-  public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
-  public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
+  @Deprecated
+  public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS =
+      HdfsClientConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS;
+  @Deprecated
+  public static final String DFS_DATA_TRANSFER_PROTECTION_KEY =
+      HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+  @Deprecated
+  public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT =
+      HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
+  @Deprecated
+  public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
+      HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
   public static final int    DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
   public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
   public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
deleted file mode 100644
index a9f33e7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.net;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.channels.ReadableByteChannel;
-
-import org.apache.hadoop.net.unix.DomainSocket;
-
-/**
- * Represents a peer that we communicate with by using a basic Socket
- * that has no associated Channel.
- *
- */
-class BasicInetPeer implements Peer {
-  private final Socket socket;
-  private final OutputStream out;
-  private final InputStream in;
-  private final boolean isLocal;
-
-  public BasicInetPeer(Socket socket) throws IOException {
-    this.socket = socket;
-    this.out = socket.getOutputStream();
-    this.in = socket.getInputStream();
-    this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
-  }
-
-  @Override
-  public ReadableByteChannel getInputStreamChannel() {
-    /*
-     * This Socket has no channel, so there's nothing to return here.
-     */
-    return null;
-  }
-
-  @Override
-  public void setReadTimeout(int timeoutMs) throws IOException {
-    socket.setSoTimeout(timeoutMs);
-  }
-
-  @Override
-  public int getReceiveBufferSize() throws IOException {
-    return socket.getReceiveBufferSize();
-  }
-
-  @Override
-  public boolean getTcpNoDelay() throws IOException {
-    return socket.getTcpNoDelay();
-  }
-
-  @Override
-  public void setWriteTimeout(int timeoutMs) {
-   /* 
-    * We can't implement write timeouts. :(
-    * 
-    * Java provides no facility to set a blocking write timeout on a Socket.
-    * You can simulate a blocking write with a timeout by using
-    * non-blocking I/O.  However, we can't use nio here, because this Socket
-    * doesn't have an associated Channel.
-    * 
-    * See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4031100 for
-    * more details.
-    */
-  }
-
-  @Override
-  public boolean isClosed() {
-    return socket.isClosed();
-  }
-
-  @Override
-  public void close() throws IOException {
-    socket.close();
-  }
-
-  @Override
-  public String getRemoteAddressString() {
-    return socket.getRemoteSocketAddress().toString();
-  }
-
-  @Override
-  public String getLocalAddressString() {
-    return socket.getLocalSocketAddress().toString();
-  }
-  
-  @Override
-  public InputStream getInputStream() throws IOException {
-    return in;
-  }
-
-  @Override
-  public OutputStream getOutputStream() throws IOException {
-    return out;
-  }
-
-  @Override
-  public boolean isLocal() {
-    return isLocal;
-  }
-
-  @Override
-  public String toString() {
-    return "BasicInetPeer(" + socket.toString() + ")";
-  }
-
-  @Override
-  public DomainSocket getDomainSocket() {
-    return null;
-  }
-
-  @Override
-  public boolean hasSecureChannel() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
deleted file mode 100644
index da660c7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.net;
-
-import java.io.IOException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.net.unix.DomainSocket;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.channels.ReadableByteChannel;
-
-/**
- * Represents a peer that we communicate with by using an encrypted
- * communications medium.
- */
-@InterfaceAudience.Private
-public class EncryptedPeer implements Peer {
-  private final Peer enclosedPeer;
-
-  /**
-   * An encrypted InputStream.
-   */
-  private final InputStream in;
-  
-  /**
-   * An encrypted OutputStream.
-   */
-  private final OutputStream out;
-  
-  /**
-   * An encrypted ReadableByteChannel.
-   */
-  private final ReadableByteChannel channel;
-
-  public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) {
-    this.enclosedPeer = enclosedPeer;
-    this.in = ios.in;
-    this.out = ios.out;
-    this.channel = ios.in instanceof ReadableByteChannel ? 
-        (ReadableByteChannel)ios.in : null;
-  }
-
-  @Override
-  public ReadableByteChannel getInputStreamChannel() {
-    return channel;
-  }
-
-  @Override
-  public void setReadTimeout(int timeoutMs) throws IOException {
-    enclosedPeer.setReadTimeout(timeoutMs);
-  }
-
-  @Override
-  public int getReceiveBufferSize() throws IOException {
-    return enclosedPeer.getReceiveBufferSize();
-  }
-
-  @Override
-  public boolean getTcpNoDelay() throws IOException {
-    return enclosedPeer.getTcpNoDelay();
-  }
-
-  @Override
-  public void setWriteTimeout(int timeoutMs) throws IOException {
-    enclosedPeer.setWriteTimeout(timeoutMs);
-  }
-
-  @Override
-  public boolean isClosed() {
-    return enclosedPeer.isClosed();
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      in.close();
-    } finally {
-      try {
-        out.close();
-      } finally {
-        enclosedPeer.close();
-      }
-    }
-  }
-
-  @Override
-  public String getRemoteAddressString() {
-    return enclosedPeer.getRemoteAddressString();
-  }
-
-  @Override
-  public String getLocalAddressString() {
-    return enclosedPeer.getLocalAddressString();
-  }
-
-  @Override
-  public InputStream getInputStream() throws IOException {
-    return in;
-  }
-
-  @Override
-  public OutputStream getOutputStream() throws IOException {
-    return out;
-  }
-
-  @Override
-  public boolean isLocal() {
-    return enclosedPeer.isLocal();
-  }
-
-  @Override
-  public String toString() {
-    return "EncryptedPeer(" + enclosedPeer + ")";
-  }
-
-  @Override
-  public DomainSocket getDomainSocket() {
-    return enclosedPeer.getDomainSocket();
-  }
-
-  @Override
-  public boolean hasSecureChannel() {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
deleted file mode 100644
index 5bb4f56..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.net;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.channels.ReadableByteChannel;
-
-import org.apache.hadoop.net.SocketInputStream;
-import org.apache.hadoop.net.SocketOutputStream;
-import org.apache.hadoop.net.unix.DomainSocket;
-
-/**
- * Represents a peer that we communicate with by using non-blocking I/O 
- * on a Socket.
- */
-class NioInetPeer implements Peer {
-  private final Socket socket;
-
-  /**
-   * An InputStream which simulates blocking I/O with timeouts using NIO.
-   */
-  private final SocketInputStream in;
-  
-  /**
-   * An OutputStream which simulates blocking I/O with timeouts using NIO.
-   */
-  private final SocketOutputStream out;
-
-  private final boolean isLocal;
-
-  NioInetPeer(Socket socket) throws IOException {
-    this.socket = socket;
-    this.in = new SocketInputStream(socket.getChannel(), 0);
-    this.out = new SocketOutputStream(socket.getChannel(), 0);
-    this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
-  }
-
-  @Override
-  public ReadableByteChannel getInputStreamChannel() {
-    return in;
-  }
-
-  @Override
-  public void setReadTimeout(int timeoutMs) throws IOException {
-    in.setTimeout(timeoutMs);
-  }
-
-  @Override
-  public int getReceiveBufferSize() throws IOException {
-    return socket.getReceiveBufferSize();
-  }
-
-  @Override
-  public boolean getTcpNoDelay() throws IOException {
-    return socket.getTcpNoDelay();
-  }
-
-  @Override
-  public void setWriteTimeout(int timeoutMs) throws IOException {
-    out.setTimeout(timeoutMs);
-  }
-
-  @Override
-  public boolean isClosed() {
-    return socket.isClosed();
-  }
-
-  @Override
-  public void close() throws IOException {
-    // We always close the outermost streams-- in this case, 'in' and 'out'
-    // Closing either one of these will also close the Socket.
-    try {
-      in.close();
-    } finally {
-      out.close();
-    }
-  }
-
-  @Override
-  public String getRemoteAddressString() {
-    return socket.getRemoteSocketAddress().toString();
-  }
-
-  @Override
-  public String getLocalAddressString() {
-    return socket.getLocalSocketAddress().toString();
-  }
-
-  @Override
-  public InputStream getInputStream() throws IOException {
-    return in;
-  }
-
-  @Override
-  public OutputStream getOutputStream() throws IOException {
-    return out;
-  }
-
-  @Override
-  public boolean isLocal() {
-    return isLocal;
-  }
-
-  @Override
-  public String toString() {
-    return "NioInetPeer(" + socket.toString() + ")";
-  }
-
-  @Override
-  public DomainSocket getDomainSocket() {
-    return null;
-  }
-
-  @Override
-  public boolean hasSecureChannel() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
index 2a547e0..e31e46a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
@@ -20,22 +20,15 @@ package org.apache.hadoop.hdfs.net;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.security.token.Token;
 
 @InterfaceAudience.Private
 public class TcpPeerServer implements PeerServer {
@@ -43,60 +36,6 @@ public class TcpPeerServer implements PeerServer {
 
   private final ServerSocket serverSocket;
 
-  public static Peer peerFromSocket(Socket socket)
-      throws IOException {
-    Peer peer = null;
-    boolean success = false;
-    try {
-      // TCP_NODELAY is crucial here because of bad interactions between
-      // Nagle's Algorithm and Delayed ACKs. With connection keepalive
-      // between the client and DN, the conversation looks like:
-      //   1. Client -> DN: Read block X
-      //   2. DN -> Client: data for block X
-      //   3. Client -> DN: Status OK (successful read)
-      //   4. Client -> DN: Read block Y
-      // The fact that step #3 and #4 are both in the client->DN direction
-      // triggers Nagling. If the DN is using delayed ACKs, this results
-      // in a delay of 40ms or more.
-      //
-      // TCP_NODELAY disables nagling and thus avoids this performance
-      // disaster.
-      socket.setTcpNoDelay(true);
-      SocketChannel channel = socket.getChannel();
-      if (channel == null) {
-        peer = new BasicInetPeer(socket);
-      } else {
-        peer = new NioInetPeer(socket);
-      }
-      success = true;
-      return peer;
-    } finally {
-      if (!success) {
-        if (peer != null) peer.close();
-        socket.close();
-      }
-    }
-  }
-
-  public static Peer peerFromSocketAndKey(
-        SaslDataTransferClient saslClient, Socket s,
-        DataEncryptionKeyFactory keyFactory,
-        Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
-        throws IOException {
-    Peer peer = null;
-    boolean success = false;
-    try {
-      peer = peerFromSocket(s);
-      peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
-      success = true;
-      return peer;
-    } finally {
-      if (!success) {
-        IOUtils.cleanup(null, peer);
-      }
-    }
-  }
-
   /**
    * Create a non-secure TcpPeerServer.
    *
@@ -136,7 +75,7 @@ public class TcpPeerServer implements PeerServer {
 
   @Override
   public Peer accept() throws IOException, SocketTimeoutException {
-    Peer peer = peerFromSocket(serverSocket.accept());
+    Peer peer = DFSUtilClient.peerFromSocket(serverSocket.accept());
     return peer;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
deleted file mode 100644
index 23407f8..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * A little struct class to wrap an InputStream and an OutputStream.
- */
-@InterfaceAudience.Private
-public class IOStreamPair {
-  public final InputStream in;
-  public final OutputStream out;
-  
-  public IOStreamPair(InputStream in, OutputStream out) {
-    this.in = in;
-    this.out = out;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java
deleted file mode 100644
index 9e6a43d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.net.InetAddress;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * Class used to indicate whether a channel is trusted or not.
- * The default implementation is to return false indicating that
- * the channel is not trusted.
- * This class can be overridden to provide custom logic to determine
- * whether a channel is trusted or not. 
- * The custom class can be specified via configuration.
- *
- */
-public class TrustedChannelResolver implements Configurable {
-  Configuration conf;
-
-  /**
-   * Returns an instance of TrustedChannelResolver.
-   * Looks up the configuration to see if there is custom class specified.
-   * @param conf
-   * @return TrustedChannelResolver
-   */
-  public static TrustedChannelResolver getInstance(Configuration conf) {
-    Class<? extends TrustedChannelResolver> clazz =
-      conf.getClass(
-          DFSConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS,
-          TrustedChannelResolver.class, TrustedChannelResolver.class);
-    return ReflectionUtils.newInstance(clazz, conf);
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  /**
-   * Return boolean value indicating whether a channel is trusted or not
-   * from a client's perspective.
-   * @return true if the channel is trusted and false otherwise.
-   */
-  public boolean isTrusted() {
-    return false;
-  }
-
-
-  /**
-   * Identify boolean value indicating whether a channel is trusted or not.
-   * @param peerAddress address of the peer
-   * @return true if the channel is trusted and false otherwise.
-   */
-  public boolean isTrusted(InetAddress peerAddress) {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
deleted file mode 100644
index 959cba0..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-
-/**
- * Creates a new {@link DataEncryptionKey} on demand.
- */
-@InterfaceAudience.Private
-public interface DataEncryptionKeyFactory {
-
-  /**
-   * Creates a new DataEncryptionKey.
-   *
-   * @return DataEncryptionKey newly created
-   * @throws IOException for any error
-   */
-  DataEncryptionKey newDataEncryptionKey() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
deleted file mode 100644
index 852819f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.security.sasl.Sasl;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.CipherOption;
-import org.apache.hadoop.crypto.CipherSuite;
-import org.apache.hadoop.crypto.CryptoCodec;
-import org.apache.hadoop.crypto.CryptoInputStream;
-import org.apache.hadoop.crypto.CryptoOutputStream;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.security.SaslPropertiesResolver;
-import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.net.InetAddresses;
-import com.google.protobuf.ByteString;
-
-/**
- * Utility methods implementing SASL negotiation for DataTransferProtocol.
- */
-@InterfaceAudience.Private
-public final class DataTransferSaslUtil {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-    DataTransferSaslUtil.class);
-
-  /**
-   * Delimiter for the three-part SASL username string.
-   */
-  public static final String NAME_DELIMITER = " ";
-
-  /**
-   * Sent by clients and validated by servers. We use a number that's unlikely
-   * to ever be sent as the value of the DATA_TRANSFER_VERSION.
-   */
-  public static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
-
-  /**
-   * Checks that SASL negotiation has completed for the given participant, and
-   * the negotiated quality of protection is included in the given SASL
-   * properties and therefore acceptable.
-   *
-   * @param sasl participant to check
-   * @param saslProps properties of SASL negotiation
-   * @throws IOException for any error
-   */
-  public static void checkSaslComplete(SaslParticipant sasl,
-      Map<String, String> saslProps) throws IOException {
-    if (!sasl.isComplete()) {
-      throw new IOException("Failed to complete SASL handshake");
-    }
-    Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
-      saslProps.get(Sasl.QOP).split(",")));
-    String negotiatedQop = sasl.getNegotiatedQop();
-    LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}",
-      requestedQop, negotiatedQop);
-    if (!requestedQop.contains(negotiatedQop)) {
-      throw new IOException(String.format("SASL handshake completed, but " +
-        "channel does not have acceptable quality of protection, " +
-        "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
-    }
-  }
-  
-  /**
-   * Check whether requested SASL Qop contains privacy.
-   * 
-   * @param saslProps properties of SASL negotiation
-   * @return boolean true if privacy exists
-   */
-  public static boolean requestedQopContainsPrivacy(
-      Map<String, String> saslProps) {
-    Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
-        saslProps.get(Sasl.QOP).split(",")));
-    return requestedQop.contains("auth-conf");
-  }
-
-  /**
-   * Creates SASL properties required for an encrypted SASL negotiation.
-   *
-   * @param encryptionAlgorithm to use for SASL negotation
-   * @return properties of encrypted SASL negotiation
-   */
-  public static Map<String, String> createSaslPropertiesForEncryption(
-      String encryptionAlgorithm) {
-    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
-    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
-    saslProps.put(Sasl.SERVER_AUTH, "true");
-    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
-    return saslProps;
-  }
-
-  /**
-   * For an encrypted SASL negotiation, encodes an encryption key to a SASL
-   * password.
-   *
-   * @param encryptionKey to encode
-   * @return key encoded as SASL password
-   */
-  public static char[] encryptionKeyToPassword(byte[] encryptionKey) {
-    return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8)
-      .toCharArray();
-  }
-
-  /**
-   * Returns InetAddress from peer.  The getRemoteAddressString has the form
-   * [host][/ip-address]:port.  The host may be missing.  The IP address (and
-   * preceding '/') may be missing.  The port preceded by ':' is always present.
-   *
-   * @param peer
-   * @return InetAddress from peer
-   */
-  public static InetAddress getPeerAddress(Peer peer) {
-    String remoteAddr = peer.getRemoteAddressString().split(":")[0];
-    int slashIdx = remoteAddr.indexOf('/');
-    return InetAddresses.forString(slashIdx != -1 ?
-        remoteAddr.substring(slashIdx + 1, remoteAddr.length()) :
-        remoteAddr);
-  }
-
-  /**
-   * Creates a SaslPropertiesResolver from the given configuration.  This method
-   * works by cloning the configuration, translating configuration properties
-   * specific to DataTransferProtocol to what SaslPropertiesResolver expects,
-   * and then delegating to SaslPropertiesResolver for initialization.  This
-   * method returns null if SASL protection has not been configured for
-   * DataTransferProtocol.
-   *
-   * @param conf configuration to read
-   * @return SaslPropertiesResolver for DataTransferProtocol, or null if not
-   *   configured
-   */
-  public static SaslPropertiesResolver getSaslPropertiesResolver(
-      Configuration conf) {
-    String qops = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
-    if (qops == null || qops.isEmpty()) {
-      LOG.debug("DataTransferProtocol not using SaslPropertiesResolver, no " +
-        "QOP found in configuration for {}", DFS_DATA_TRANSFER_PROTECTION_KEY);
-      return null;
-    }
-    Configuration saslPropsResolverConf = new Configuration(conf);
-    saslPropsResolverConf.set(HADOOP_RPC_PROTECTION, qops);
-    Class<? extends SaslPropertiesResolver> resolverClass = conf.getClass(
-      HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
-      SaslPropertiesResolver.class, SaslPropertiesResolver.class);
-    resolverClass = conf.getClass(DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY,
-      resolverClass, SaslPropertiesResolver.class);
-    saslPropsResolverConf.setClass(HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
-      resolverClass, SaslPropertiesResolver.class);
-    SaslPropertiesResolver resolver = SaslPropertiesResolver.getInstance(
-      saslPropsResolverConf);
-    LOG.debug("DataTransferProtocol using SaslPropertiesResolver, configured " +
-      "QOP {} = {}, configured class {} = {}", DFS_DATA_TRANSFER_PROTECTION_KEY, qops, 
-      DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass);
-    return resolver;
-  }
-
-  /**
-   * Reads a SASL negotiation message.
-   *
-   * @param in stream to read
-   * @return bytes of SASL negotiation messsage
-   * @throws IOException for any error
-   */
-  public static byte[] readSaslMessage(InputStream in) throws IOException {
-    DataTransferEncryptorMessageProto proto =
-        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
-    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
-      throw new InvalidEncryptionKeyException(proto.getMessage());
-    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
-      throw new IOException(proto.getMessage());
-    } else {
-      return proto.getPayload().toByteArray();
-    }
-  }
-  
-  /**
-   * Reads a SASL negotiation message and negotiation cipher options. 
-   * 
-   * @param in stream to read
-   * @param cipherOptions list to store negotiation cipher options
-   * @return byte[] SASL negotiation message
-   * @throws IOException for any error
-   */
-  public static byte[] readSaslMessageAndNegotiationCipherOptions(
-      InputStream in, List<CipherOption> cipherOptions) throws IOException {
-    DataTransferEncryptorMessageProto proto =
-        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
-    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
-      throw new InvalidEncryptionKeyException(proto.getMessage());
-    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
-      throw new IOException(proto.getMessage());
-    } else {
-      List<CipherOptionProto> optionProtos = proto.getCipherOptionList();
-      if (optionProtos != null) {
-        for (CipherOptionProto optionProto : optionProtos) {
-          cipherOptions.add(PBHelper.convert(optionProto));
-        }
-      }
-      return proto.getPayload().toByteArray();
-    }
-  }
-  
-  /**
-   * Negotiate a cipher option which server supports.
-   * 
-   * @param conf the configuration
-   * @param options the cipher options which client supports
-   * @return CipherOption negotiated cipher option
-   */
-  public static CipherOption negotiateCipherOption(Configuration conf,
-      List<CipherOption> options) throws IOException {
-    // Negotiate cipher suites if configured.  Currently, the only supported
-    // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
-    // values for future expansion.
-    String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
-    if (cipherSuites == null || cipherSuites.isEmpty()) {
-      return null;
-    }
-    if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
-      throw new IOException(String.format("Invalid cipher suite, %s=%s",
-          DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
-    }
-    if (options != null) {
-      for (CipherOption option : options) {
-        CipherSuite suite = option.getCipherSuite();
-        if (suite == CipherSuite.AES_CTR_NOPADDING) {
-          int keyLen = conf.getInt(
-              DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY,
-              DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT) / 8;
-          CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
-          byte[] inKey = new byte[keyLen];
-          byte[] inIv = new byte[suite.getAlgorithmBlockSize()];
-          byte[] outKey = new byte[keyLen];
-          byte[] outIv = new byte[suite.getAlgorithmBlockSize()];
-          codec.generateSecureRandom(inKey);
-          codec.generateSecureRandom(inIv);
-          codec.generateSecureRandom(outKey);
-          codec.generateSecureRandom(outIv);
-          return new CipherOption(suite, inKey, inIv, outKey, outIv);
-        }
-      }
-    }
-    return null;
-  }
-  
-  /**
-   * Send SASL message and negotiated cipher option to client.
-   * 
-   * @param out stream to receive message
-   * @param payload to send
-   * @param option negotiated cipher option
-   * @throws IOException for any error
-   */
-  public static void sendSaslMessageAndNegotiatedCipherOption(
-      OutputStream out, byte[] payload, CipherOption option) 
-          throws IOException {
-    DataTransferEncryptorMessageProto.Builder builder =
-        DataTransferEncryptorMessageProto.newBuilder();
-    
-    builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
-    if (payload != null) {
-      builder.setPayload(ByteString.copyFrom(payload));
-    }
-    if (option != null) {
-      builder.addCipherOption(PBHelper.convert(option));
-    }
-    
-    DataTransferEncryptorMessageProto proto = builder.build();
-    proto.writeDelimitedTo(out);
-    out.flush();
-  }
-  
-  /**
-   * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream}
-   * and {@link org.apache.hadoop.crypto.CryptoOutputStream}
-   * 
-   * @param conf the configuration
-   * @param cipherOption negotiated cipher option
-   * @param out underlying output stream
-   * @param in underlying input stream
-   * @param isServer is server side
-   * @return IOStreamPair the stream pair
-   * @throws IOException for any error
-   */
-  public static IOStreamPair createStreamPair(Configuration conf,
-      CipherOption cipherOption, OutputStream out, InputStream in, 
-      boolean isServer) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating IOStreamPair of CryptoInputStream and " +
-          "CryptoOutputStream.");
-    }
-    CryptoCodec codec = CryptoCodec.getInstance(conf, 
-        cipherOption.getCipherSuite());
-    byte[] inKey = cipherOption.getInKey();
-    byte[] inIv = cipherOption.getInIv();
-    byte[] outKey = cipherOption.getOutKey();
-    byte[] outIv = cipherOption.getOutIv();
-    InputStream cIn = new CryptoInputStream(in, codec, 
-        isServer ? inKey : outKey, isServer ? inIv : outIv);
-    OutputStream cOut = new CryptoOutputStream(out, codec, 
-        isServer ? outKey : inKey, isServer ? outIv : inIv);
-    return new IOStreamPair(cIn, cOut);
-  }
-
-  /**
-   * Sends a SASL negotiation message indicating an error.
-   *
-   * @param out stream to receive message
-   * @param message to send
-   * @throws IOException for any error
-   */
-  public static void sendGenericSaslErrorMessage(OutputStream out,
-      String message) throws IOException {
-    sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
-  }
-
-  /**
-   * Sends a SASL negotiation message.
-   *
-   * @param out stream to receive message
-   * @param payload to send
-   * @throws IOException for any error
-   */
-  public static void sendSaslMessage(OutputStream out, byte[] payload)
-      throws IOException {
-    sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
-  }
-  
-  /**
-   * Send a SASL negotiation message and negotiation cipher options to server.
-   * 
-   * @param out stream to receive message
-   * @param payload to send
-   * @param options cipher options to negotiate
-   * @throws IOException for any error
-   */
-  public static void sendSaslMessageAndNegotiationCipherOptions(
-      OutputStream out, byte[] payload, List<CipherOption> options)
-          throws IOException {
-    DataTransferEncryptorMessageProto.Builder builder =
-        DataTransferEncryptorMessageProto.newBuilder();
-    
-    builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
-    if (payload != null) {
-      builder.setPayload(ByteString.copyFrom(payload));
-    }
-    if (options != null) {
-      builder.addAllCipherOption(PBHelper.convertCipherOptions(options));
-    }
-    
-    DataTransferEncryptorMessageProto proto = builder.build();
-    proto.writeDelimitedTo(out);
-    out.flush();
-  }
-  
-  /**
-   * Read SASL message and negotiated cipher option from server.
-   * 
-   * @param in stream to read
-   * @return SaslResponseWithNegotiatedCipherOption SASL message and 
-   * negotiated cipher option
-   * @throws IOException for any error
-   */
-  public static SaslResponseWithNegotiatedCipherOption
-      readSaslMessageAndNegotiatedCipherOption(InputStream in)
-          throws IOException {
-    DataTransferEncryptorMessageProto proto =
-        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
-    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
-      throw new InvalidEncryptionKeyException(proto.getMessage());
-    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
-      throw new IOException(proto.getMessage());
-    } else {
-      byte[] response = proto.getPayload().toByteArray();
-      List<CipherOption> options = PBHelper.convertCipherOptionProtos(
-          proto.getCipherOptionList());
-      CipherOption option = null;
-      if (options != null && !options.isEmpty()) {
-        option = options.get(0);
-      }
-      return new SaslResponseWithNegotiatedCipherOption(response, option);
-    }
-  }
-  
-  /**
-   * Encrypt the key and iv of the negotiated cipher option.
-   * 
-   * @param option negotiated cipher option
-   * @param sasl SASL participant representing server
-   * @return CipherOption negotiated cipher option which contains the 
-   * encrypted key and iv
-   * @throws IOException for any error
-   */
-  public static CipherOption wrap(CipherOption option, SaslParticipant sasl) 
-      throws IOException {
-    if (option != null) {
-      byte[] inKey = option.getInKey();
-      if (inKey != null) {
-        inKey = sasl.wrap(inKey, 0, inKey.length);
-      }
-      byte[] outKey = option.getOutKey();
-      if (outKey != null) {
-        outKey = sasl.wrap(outKey, 0, outKey.length);
-      }
-      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
-          outKey, option.getOutIv());
-    }
-    
-    return null;
-  }
-  
-  /**
-   * Decrypt the key and iv of the negotiated cipher option.
-   * 
-   * @param option negotiated cipher option
-   * @param sasl SASL participant representing client
-   * @return CipherOption negotiated cipher option which contains the 
-   * decrypted key and iv
-   * @throws IOException for any error
-   */
-  public static CipherOption unwrap(CipherOption option, SaslParticipant sasl)
-      throws IOException {
-    if (option != null) {
-      byte[] inKey = option.getInKey();
-      if (inKey != null) {
-        inKey = sasl.unwrap(inKey, 0, inKey.length);
-      }
-      byte[] outKey = option.getOutKey();
-      if (outKey != null) {
-        outKey = sasl.unwrap(outKey, 0, outKey.length);
-      }
-      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(),
-          outKey, option.getOutIv());
-    }
-    
-    return null;
-  }
-
-  /**
-   * Sends a SASL negotiation message.
-   *
-   * @param out stream to receive message
-   * @param status negotiation status
-   * @param payload to send
-   * @param message to send
-   * @throws IOException for any error
-   */
-  public static void sendSaslMessage(OutputStream out,
-      DataTransferEncryptorStatus status, byte[] payload, String message)
-          throws IOException {
-    DataTransferEncryptorMessageProto.Builder builder =
-        DataTransferEncryptorMessageProto.newBuilder();
-    
-    builder.setStatus(status);
-    if (payload != null) {
-      builder.setPayload(ByteString.copyFrom(payload));
-    }
-    if (message != null) {
-      builder.setMessage(message);
-    }
-    
-    DataTransferEncryptorMessageProto proto = builder.build();
-    proto.writeDelimitedTo(out);
-    out.flush();
-  }
-
-  /**
-   * There is no reason to instantiate this class.
-   */
-  private DataTransferSaslUtil() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
deleted file mode 100644
index 00b131f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
-import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.CipherOption;
-import org.apache.hadoop.crypto.CipherSuite;
-import org.apache.hadoop.hdfs.net.EncryptedPeer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.security.SaslPropertiesResolver;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-
-/**
- * Negotiates SASL for DataTransferProtocol on behalf of a client.  There are
- * two possible supported variants of SASL negotiation: either a general-purpose
- * negotiation supporting any quality of protection, or a specialized
- * negotiation that enforces privacy as the quality of protection using a
- * cryptographically strong encryption key.
- *
- * This class is used in both the HDFS client and the DataNode.  The DataNode
- * needs it, because it acts as a client to other DataNodes during write
- * pipelines and block transfers.
- */
-@InterfaceAudience.Private
-public class SaslDataTransferClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-    SaslDataTransferClient.class);
-
-  private final Configuration conf;
-  private final AtomicBoolean fallbackToSimpleAuth;
-  private final SaslPropertiesResolver saslPropsResolver;
-  private final TrustedChannelResolver trustedChannelResolver;
-
-  /**
-   * Creates a new SaslDataTransferClient.  This constructor is used in cases
-   * where it is not relevant to track if a secure client did a fallback to
-   * simple auth.  For intra-cluster connections between data nodes in the same
-   * cluster, we can assume that all run under the same security configuration.
-   *
-   * @param conf the configuration
-   * @param saslPropsResolver for determining properties of SASL negotiation
-   * @param trustedChannelResolver for identifying trusted connections that do
-   *   not require SASL negotiation
-   */
-  public SaslDataTransferClient(Configuration conf, 
-      SaslPropertiesResolver saslPropsResolver,
-      TrustedChannelResolver trustedChannelResolver) {
-    this(conf, saslPropsResolver, trustedChannelResolver, null);
-  }
-
-  /**
-   * Creates a new SaslDataTransferClient.
-   *
-   * @param conf the configuration
-   * @param saslPropsResolver for determining properties of SASL negotiation
-   * @param trustedChannelResolver for identifying trusted connections that do
-   *   not require SASL negotiation
-   * @param fallbackToSimpleAuth checked on each attempt at general SASL
-   *   handshake, if true forces use of simple auth
-   */
-  public SaslDataTransferClient(Configuration conf, 
-      SaslPropertiesResolver saslPropsResolver,
-      TrustedChannelResolver trustedChannelResolver,
-      AtomicBoolean fallbackToSimpleAuth) {
-    this.conf = conf;
-    this.fallbackToSimpleAuth = fallbackToSimpleAuth;
-    this.saslPropsResolver = saslPropsResolver;
-    this.trustedChannelResolver = trustedChannelResolver;
-  }
-
-  /**
-   * Sends client SASL negotiation for a newly allocated socket if required.
-   *
-   * @param socket connection socket
-   * @param underlyingOut connection output stream
-   * @param underlyingIn connection input stream
-   * @param encryptionKeyFactory for creation of an encryption key
-   * @param accessToken connection block access token
-   * @param datanodeId ID of destination DataNode
-   * @return new pair of streams, wrapped after SASL negotiation
-   * @throws IOException for any error
-   */
-  public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut,
-      InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
-      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
-      throws IOException {
-    // The encryption key factory only returns a key if encryption is enabled.
-    DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
-      encryptionKeyFactory.newDataEncryptionKey() : null;
-    IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
-      underlyingIn, encryptionKey, accessToken, datanodeId);
-    return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
-  }
-
-  /**
-   * Sends client SASL negotiation for a peer if required.
-   *
-   * @param peer connection peer
-   * @param encryptionKeyFactory for creation of an encryption key
-   * @param accessToken connection block access token
-   * @param datanodeId ID of destination DataNode
-   * @return new pair of streams, wrapped after SASL negotiation
-   * @throws IOException for any error
-   */
-  public Peer peerSend(Peer peer, DataEncryptionKeyFactory encryptionKeyFactory,
-      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
-      throws IOException {
-    IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer),
-      peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory,
-      accessToken, datanodeId);
-    // TODO: Consider renaming EncryptedPeer to SaslPeer.
-    return ios != null ? new EncryptedPeer(peer, ios) : peer;
-  }
-
-  /**
-   * Sends client SASL negotiation for a socket if required.
-   *
-   * @param socket connection socket
-   * @param underlyingOut connection output stream
-   * @param underlyingIn connection input stream
-   * @param encryptionKeyFactory for creation of an encryption key
-   * @param accessToken connection block access token
-   * @param datanodeId ID of destination DataNode
-   * @return new pair of streams, wrapped after SASL negotiation
-   * @throws IOException for any error
-   */
-  public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut,
-      InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
-      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
-      throws IOException {
-    IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
-      underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
-    return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
-  }
-
-  /**
-   * Checks if an address is already trusted and then sends client SASL
-   * negotiation if required.
-   *
-   * @param addr connection address
-   * @param underlyingOut connection output stream
-   * @param underlyingIn connection input stream
-   * @param encryptionKeyFactory for creation of an encryption key
-   * @param accessToken connection block access token
-   * @param datanodeId ID of destination DataNode
-   * @return new pair of streams, wrapped after SASL negotiation
-   * @throws IOException for any error
-   */
-  private IOStreamPair checkTrustAndSend(InetAddress addr,
-      OutputStream underlyingOut, InputStream underlyingIn,
-      DataEncryptionKeyFactory encryptionKeyFactory,
-      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
-      throws IOException {
-    if (!trustedChannelResolver.isTrusted() &&
-        !trustedChannelResolver.isTrusted(addr)) {
-      // The encryption key factory only returns a key if encryption is enabled.
-      DataEncryptionKey encryptionKey =
-        encryptionKeyFactory.newDataEncryptionKey();
-      return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
-        datanodeId);
-    } else {
-      LOG.debug(
-        "SASL client skipping handshake on trusted connection for addr = {}, "
-        + "datanodeId = {}", addr, datanodeId);
-      return null;
-    }
-  }
-
-  /**
-   * Sends client SASL negotiation if required.  Determines the correct type of
-   * SASL handshake based on configuration.
-   *
-   * @param addr connection address
-   * @param underlyingOut connection output stream
-   * @param underlyingIn connection input stream
-   * @param encryptionKey for an encrypted SASL handshake
-   * @param accessToken connection block access token
-   * @param datanodeId ID of destination DataNode
-   * @return new pair of streams, wrapped after SASL negotiation
-   * @throws IOException for any error
-   */
-  private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
-      InputStream underlyingIn, DataEncryptionKey encryptionKey,
-      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
-      throws IOException {
-    if (encryptionKey != null) {
-      LOG.debug(
-        "SASL client doing encrypted handshake for addr = {}, datanodeId = {}",
-        addr, datanodeId);
-      return getEncryptedStreams(underlyingOut, underlyingIn,
-        encryptionKey);
-    } else if (!UserGroupInformation.isSecurityEnabled()) {
-      LOG.debug(
-        "SASL client skipping handshake in unsecured configuration for "
-        + "addr = {}, datanodeId = {}", addr, datanodeId);
-      return null;
-    } else if (SecurityUtil.isPrivilegedPort(datanodeId.getXferPort())) {
-      LOG.debug(
-        "SASL client skipping handshake in secured configuration with "
-        + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId);
-      return null;
-    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
-      LOG.debug(
-        "SASL client skipping handshake in secured configuration with "
-        + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId);
-      return null;
-    } else if (saslPropsResolver != null) {
-      LOG.debug(
-        "SASL client doing general handshake for addr = {}, datanodeId = {}",
-        addr, datanodeId);
-      return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken,
-        datanodeId);
-    } else {
-      // It's a secured cluster using non-privileged ports, but no SASL.  The
-      // only way this can happen is if the DataNode has
-      // ignore.secure.ports.for.testing configured, so this is a rare edge case.
-      LOG.debug(
-        "SASL client skipping handshake in secured configuration with no SASL "
-        + "protection configured for addr = {}, datanodeId = {}",
-        addr, datanodeId);
-      return null;
-    }
-  }
-
-  /**
-   * Sends client SASL negotiation for specialized encrypted handshake.
-   *
-   * @param underlyingOut connection output stream
-   * @param underlyingIn connection input stream
-   * @param encryptionKey for an encrypted SASL handshake
-   * @return new pair of streams, wrapped after SASL negotiation
-   * @throws IOException for any error
-   */
-  private IOStreamPair getEncryptedStreams(OutputStream underlyingOut,
-      InputStream underlyingIn, DataEncryptionKey encryptionKey)
-      throws IOException {
-    Map<String, String> saslProps = createSaslPropertiesForEncryption(
-      encryptionKey.encryptionAlgorithm);
-
-    LOG.debug("Client using encryption algorithm {}",
-      encryptionKey.encryptionAlgorithm);
-
-    String userName = getUserNameFromEncryptionKey(encryptionKey);
-    char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey);
-    CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
-      password);
-    return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
-      callbackHandler);
-  }
-
-  /**
-   * The SASL username for an encrypted handshake consists of the keyId,
-   * blockPoolId, and nonce with the first two encoded as Strings, and the third
-   * encoded using Base64. The fields are each separated by a single space.
-   * 
-   * @param encryptionKey the encryption key to encode as a SASL username.
-   * @return encoded username containing keyId, blockPoolId, and nonce
-   */
-  private static String getUserNameFromEncryptionKey(
-      DataEncryptionKey encryptionKey) {
-    return encryptionKey.keyId + NAME_DELIMITER +
-        encryptionKey.blockPoolId + NAME_DELIMITER +
-        new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
-  }
-
-  /**
-   * Sets user name and password when asked by the client-side SASL object.
-   */
-  private static final class SaslClientCallbackHandler
-      implements CallbackHandler {
-
-    private final char[] password;
-    private final String userName;
-
-    /**
-     * Creates a new SaslClientCallbackHandler.
-     *
-     * @param userName SASL user name
-     * @Param password SASL password
-     */
-    public SaslClientCallbackHandler(String userName, char[] password) {
-      this.password = password;
-      this.userName = userName;
-    }
-
-    @Override
-    public void handle(Callback[] callbacks) throws IOException,
-        UnsupportedCallbackException {
-      NameCallback nc = null;
-      PasswordCallback pc = null;
-      RealmCallback rc = null;
-      for (Callback callback : callbacks) {
-        if (callback instanceof RealmChoiceCallback) {
-          continue;
-        } else if (callback instanceof NameCallback) {
-          nc = (NameCallback) callback;
-        } else if (callback instanceof PasswordCallback) {
-          pc = (PasswordCallback) callback;
-        } else if (callback instanceof RealmCallback) {
-          rc = (RealmCallback) callback;
-        } else {
-          throw new UnsupportedCallbackException(callback,
-              "Unrecognized SASL client callback");
-        }
-      }
-      if (nc != null) {
-        nc.setName(userName);
-      }
-      if (pc != null) {
-        pc.setPassword(password);
-      }
-      if (rc != null) {
-        rc.setText(rc.getDefaultText());
-      }
-    }
-  }
-
-  /**
-   * Sends client SASL negotiation for general-purpose handshake.
-   *
-   * @param addr connection address
-   * @param underlyingOut connection output stream
-   * @param underlyingIn connection input stream
-   * @param accessToken connection block access token
-   * @param datanodeId ID of destination DataNode
-   * @return new pair of streams, wrapped after SASL negotiation
-   * @throws IOException for any error
-   */
-  private IOStreamPair getSaslStreams(InetAddress addr,
-      OutputStream underlyingOut, InputStream underlyingIn,
-      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
-      throws IOException {
-    Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
-
-    String userName = buildUserName(accessToken);
-    char[] password = buildClientPassword(accessToken);
-    CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
-      password);
-    return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
-      callbackHandler);
-  }
-
-  /**
-   * Builds the client's user name for the general-purpose handshake, consisting
-   * of the base64-encoded serialized block access token identifier.  Note that
-   * this includes only the token identifier, not the token itself, which would
-   * include the password.  The password is a shared secret, and we must not
-   * write it on the network during the SASL authentication exchange.
-   *
-   * @param blockToken for block access
-   * @return SASL user name
-   */
-  private static String buildUserName(Token<BlockTokenIdentifier> blockToken) {
-    return new String(Base64.encodeBase64(blockToken.getIdentifier(), false),
-      Charsets.UTF_8);
-  }
-
-  /**
-   * Calculates the password on the client side for the general-purpose
-   * handshake.  The password consists of the block access token's password.
-   *
-   * @param blockToken for block access
-   * @return SASL password
-   */    
-  private char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
-    return new String(Base64.encodeBase64(blockToken.getPassword(), false),
-      Charsets.UTF_8).toCharArray();
-  }
-
-  /**
-   * This method actually executes the client-side SASL handshake.
-   *
-   * @param underlyingOut connection output stream
-   * @param underlyingIn connection input stream
-   * @param userName SASL user name
-   * @param saslProps properties of SASL negotiation
-   * @param callbackHandler for responding to SASL callbacks
-   * @return new pair of streams, wrapped after SASL negotiation
-   * @throws IOException for any error
-   */
-  private IOStreamPair doSaslHandshake(OutputStream underlyingOut,
-      InputStream underlyingIn, String userName, Map<String, String> saslProps,
-      CallbackHandler callbackHandler) throws IOException {
-
-    DataOutputStream out = new DataOutputStream(underlyingOut);
-    DataInputStream in = new DataInputStream(underlyingIn);
-
-    SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName,
-      saslProps, callbackHandler);
-
-    out.writeInt(SASL_TRANSFER_MAGIC_NUMBER);
-    out.flush();
-
-    try {
-      // Start of handshake - "initial response" in SASL terminology.
-      sendSaslMessage(out, new byte[0]);
-
-      // step 1
-      byte[] remoteResponse = readSaslMessage(in);
-      byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
-      List<CipherOption> cipherOptions = null;
-      if (requestedQopContainsPrivacy(saslProps)) {
-        // Negotiate cipher suites if configured.  Currently, the only supported
-        // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
-        // values for future expansion.
-        String cipherSuites = conf.get(
-            DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
-        if (cipherSuites != null && !cipherSuites.isEmpty()) {
-          if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
-            throw new IOException(String.format("Invalid cipher suite, %s=%s",
-                DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
-          }
-          CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING);
-          cipherOptions = Lists.newArrayListWithCapacity(1);
-          cipherOptions.add(option);
-        }
-      }
-      sendSaslMessageAndNegotiationCipherOptions(out, localResponse, 
-          cipherOptions);
-
-      // step 2 (client-side only)
-      SaslResponseWithNegotiatedCipherOption response = 
-          readSaslMessageAndNegotiatedCipherOption(in);
-      localResponse = sasl.evaluateChallengeOrResponse(response.payload);
-      assert localResponse == null;
-
-      // SASL handshake is complete
-      checkSaslComplete(sasl, saslProps);
-
-      CipherOption cipherOption = null;
-      if (sasl.isNegotiatedQopPrivacy()) {
-        // Unwrap the negotiated cipher option
-        cipherOption = unwrap(response.cipherOption, sasl);
-      }
-
-      // If negotiated cipher option is not null, we will use it to create 
-      // stream pair.
-      return cipherOption != null ? createStreamPair(
-          conf, cipherOption, underlyingOut, underlyingIn, false) : 
-            sasl.createStreamPair(out, in);
-    } catch (IOException ioe) {
-      sendGenericSaslErrorMessage(out, ioe.getMessage());
-      throw ioe;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
index f060beb..95965b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
 
 import java.io.ByteArrayInputStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
deleted file mode 100644
index f14a075..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.Map;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.security.SaslInputStream;
-import org.apache.hadoop.security.SaslOutputStream;
-
-/**
- * Strongly inspired by Thrift's TSaslTransport class.
- *
- * Used to abstract over the <code>SaslServer</code> and
- * <code>SaslClient</code> classes, which share a lot of their interface, but
- * unfortunately don't share a common superclass.
- */
-@InterfaceAudience.Private
-class SaslParticipant {
-
-  // This has to be set as part of the SASL spec, but it don't matter for
-  // our purposes, but may not be empty. It's sent over the wire, so use
-  // a short string.
-  private static final String SERVER_NAME = "0";
-  private static final String PROTOCOL = "hdfs";
-  private static final String MECHANISM = "DIGEST-MD5";
-
-  // One of these will always be null.
-  private final SaslServer saslServer;
-  private final SaslClient saslClient;
-
-  /**
-   * Creates a SaslParticipant wrapping a SaslServer.
-   *
-   * @param saslProps properties of SASL negotiation
-   * @param callbackHandler for handling all SASL callbacks
-   * @return SaslParticipant wrapping SaslServer
-   * @throws SaslException for any error
-   */
-  public static SaslParticipant createServerSaslParticipant(
-      Map<String, String> saslProps, CallbackHandler callbackHandler)
-      throws SaslException {
-    return new SaslParticipant(Sasl.createSaslServer(MECHANISM,
-      PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
-  }
-
-  /**
-   * Creates a SaslParticipant wrapping a SaslClient.
-   *
-   * @param userName SASL user name
-   * @param saslProps properties of SASL negotiation
-   * @param callbackHandler for handling all SASL callbacks
-   * @return SaslParticipant wrapping SaslClient
-   * @throws SaslException for any error
-   */
-  public static SaslParticipant createClientSaslParticipant(String userName,
-      Map<String, String> saslProps, CallbackHandler callbackHandler)
-      throws SaslException {
-    return new SaslParticipant(Sasl.createSaslClient(new String[] { MECHANISM },
-      userName, PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
-  }
-
-  /**
-   * Private constructor wrapping a SaslServer.
-   *
-   * @param saslServer to wrap
-   */
-  private SaslParticipant(SaslServer saslServer) {
-    this.saslServer = saslServer;
-    this.saslClient = null;
-  }
-
-  /**
-   * Private constructor wrapping a SaslClient.
-   *
-   * @param saslClient to wrap
-   */
-  private SaslParticipant(SaslClient saslClient) {
-    this.saslServer = null;
-    this.saslClient = saslClient;
-  }
-
-  /**
-   * @see {@link SaslServer#evaluateResponse}
-   * @see {@link SaslClient#evaluateChallenge}
-   */
-  public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse)
-      throws SaslException {
-    if (saslClient != null) {
-      return saslClient.evaluateChallenge(challengeOrResponse);
-    } else {
-      return saslServer.evaluateResponse(challengeOrResponse);
-    }
-  }
-
-  /**
-   * After successful SASL negotation, returns the negotiated quality of
-   * protection.
-   *
-   * @return negotiated quality of protection
-   */
-  public String getNegotiatedQop() {
-    if (saslClient != null) {
-      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
-    } else {
-      return (String) saslServer.getNegotiatedProperty(Sasl.QOP);
-    }
-  }
-  
-  /**
-   * After successful SASL negotiation, returns whether it's QOP privacy
-   * 
-   * @return boolean whether it's QOP privacy
-   */
-  public boolean isNegotiatedQopPrivacy() {
-    String qop = getNegotiatedQop();
-    return qop != null && "auth-conf".equalsIgnoreCase(qop);
-  }
-  
-  /**
-   * Wraps a byte array.
-   * 
-   * @param bytes The array containing the bytes to wrap.
-   * @param off The starting position at the array
-   * @param len The number of bytes to wrap
-   * @return byte[] wrapped bytes
-   * @throws SaslException if the bytes cannot be successfully wrapped
-   */
-  public byte[] wrap(byte[] bytes, int off, int len) throws SaslException {
-    if (saslClient != null) {
-      return saslClient.wrap(bytes, off, len);
-    } else {
-      return saslServer.wrap(bytes, off, len);
-    }
-  }
-  
-  /**
-   * Unwraps a byte array.
-   * 
-   * @param bytes The array containing the bytes to unwrap.
-   * @param off The starting position at the array
-   * @param len The number of bytes to unwrap
-   * @return byte[] unwrapped bytes
-   * @throws SaslException if the bytes cannot be successfully unwrapped
-   */
-  public byte[] unwrap(byte[] bytes, int off, int len) throws SaslException {
-    if (saslClient != null) {
-      return saslClient.unwrap(bytes, off, len);
-    } else {
-      return saslServer.unwrap(bytes, off, len);
-    }
-  }
-
-  /**
-   * Returns true if SASL negotiation is complete.
-   *
-   * @return true if SASL negotiation is complete
-   */
-  public boolean isComplete() {
-    if (saslClient != null) {
-      return saslClient.isComplete();
-    } else {
-      return saslServer.isComplete();
-    }
-  }
-
-  /**
-   * Return some input/output streams that may henceforth have their
-   * communication encrypted, depending on the negotiated quality of protection.
-   *
-   * @param out output stream to wrap
-   * @param in input stream to wrap
-   * @return IOStreamPair wrapping the streams
-   */
-  public IOStreamPair createStreamPair(DataOutputStream out,
-      DataInputStream in) {
-    if (saslClient != null) {
-      return new IOStreamPair(
-          new SaslInputStream(in, saslClient),
-          new SaslOutputStream(out, saslClient));
-    } else {
-      return new IOStreamPair(
-          new SaslInputStream(in, saslServer),
-          new SaslOutputStream(out, saslServer));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
deleted file mode 100644
index f69441b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.crypto.CipherOption;
-
-@InterfaceAudience.Private
-public class SaslResponseWithNegotiatedCipherOption {
-  final byte[] payload;
-  final CipherOption cipherOption;
-  
-  public SaslResponseWithNegotiatedCipherOption(byte[] payload, 
-      CipherOption cipherOption) {
-    this.payload = payload;
-    this.cipherOption = cipherOption;
-  }
-}
\ No newline at end of file


Mime
View raw message