Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 814951869D for ; Thu, 3 Sep 2015 22:35:06 +0000 (UTC) Received: (qmail 86773 invoked by uid 500); 3 Sep 2015 22:35:05 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 86381 invoked by uid 500); 3 Sep 2015 22:35:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 86119 invoked by uid 99); 3 Sep 2015 22:35:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Sep 2015 22:35:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5D2ABE7EB6; Thu, 3 Sep 2015 22:35:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wheat9@apache.org To: common-commits@hadoop.apache.org Date: Thu, 03 Sep 2015 22:35:07 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] hadoop git commit: HDFS-9002. Move o.a.h.hdfs.net/*Peer classes to hdfs-client. Contributed by Mingliang Liu. HDFS-9002. Move o.a.h.hdfs.net/*Peer classes to hdfs-client. Contributed by Mingliang Liu. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d99018d6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d99018d6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d99018d6 Branch: refs/heads/branch-2 Commit: d99018d6bcce2d97f8627f10355b0503cd728ec8 Parents: 9871f57 Author: Haohui Mai Authored: Thu Sep 3 15:32:53 2015 -0700 Committer: Haohui Mai Committed: Thu Sep 3 15:33:22 2015 -0700 ---------------------------------------------------------------------- .../dev-support/findbugsExcludeFile.xml | 1 + .../org/apache/hadoop/hdfs/DFSUtilClient.java | 64 +++ .../hdfs/client/HdfsClientConfigKeys.java | 14 + .../apache/hadoop/hdfs/net/BasicInetPeer.java | 133 +++++ .../apache/hadoop/hdfs/net/EncryptedPeer.java | 142 +++++ .../org/apache/hadoop/hdfs/net/NioInetPeer.java | 136 +++++ .../java/org/apache/hadoop/hdfs/net/Peer.java | 8 +- .../protocol/datatransfer/IOStreamPair.java | 37 ++ .../datatransfer/TrustedChannelResolver.java | 81 +++ .../sasl/DataEncryptionKeyFactory.java | 38 ++ .../datatransfer/sasl/DataTransferSaslUtil.java | 519 +++++++++++++++++++ .../sasl/SaslDataTransferClient.java | 498 ++++++++++++++++++ .../datatransfer/sasl/SaslParticipant.java | 210 ++++++++ .../SaslResponseWithNegotiatedCipherOption.java | 33 ++ .../hadoop/hdfs/protocolPB/PBHelperClient.java | 102 ++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../java/org/apache/hadoop/hdfs/DFSClient.java | 3 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 28 +- .../apache/hadoop/hdfs/net/BasicInetPeer.java | 133 ----- .../apache/hadoop/hdfs/net/EncryptedPeer.java | 142 ----- .../org/apache/hadoop/hdfs/net/NioInetPeer.java | 136 ----- .../apache/hadoop/hdfs/net/TcpPeerServer.java | 65 +-- .../protocol/datatransfer/IOStreamPair.java | 37 -- .../datatransfer/TrustedChannelResolver.java | 81 --- .../sasl/DataEncryptionKeyFactory.java | 38 -- .../datatransfer/sasl/DataTransferSaslUtil.java | 519 ------------------- .../sasl/SaslDataTransferClient.java | 498 ------------------ .../sasl/SaslDataTransferServer.java | 2 +- .../datatransfer/sasl/SaslParticipant.java | 210 -------- .../SaslResponseWithNegotiatedCipherOption.java | 33 -- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 111 +--- .../hdfs/server/namenode/FSDirXAttrOp.java | 8 +- .../hdfs/server/namenode/FSDirectory.java | 3 +- .../hdfs/server/namenode/NamenodeFsck.java | 4 +- .../apache/hadoop/hdfs/BlockReaderTestUtil.java | 3 +- .../org/apache/hadoop/hdfs/MiniDFSCluster.java | 2 +- .../hadoop/hdfs/TestEncryptedTransfer.java | 4 +- .../sasl/SaslDataTransferTestCase.java | 2 +- .../datatransfer/sasl/TestSaslDataTransfer.java | 2 +- .../hdfs/qjournal/TestSecureNNWithQJM.java | 2 +- .../blockmanagement/TestBlockTokenWithDFS.java | 4 +- .../datanode/TestDataNodeVolumeFailure.java | 4 +- 42 files changed, 2064 insertions(+), 2029 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml index 036ac09..515da24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml @@ -14,6 +14,7 @@ + http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index a89f556..b032250 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -27,16 +27,24 @@ import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.net.BasicInetPeer; +import org.apache.hadoop.hdfs.net.NioInetPeer; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.util.IOUtilsClient; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +54,10 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.Socket; import java.net.URI; import java.net.URISyntaxException; +import java.nio.channels.SocketChannel; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Collections; @@ -523,4 +533,58 @@ public class DFSUtilClient { } return keyProvider; } + + public static Peer peerFromSocket(Socket socket) + throws IOException { + Peer peer = null; + boolean success = false; + try { + // TCP_NODELAY is crucial here because of bad interactions between + // Nagle's Algorithm and Delayed ACKs. With connection keepalive + // between the client and DN, the conversation looks like: + // 1. Client -> DN: Read block X + // 2. DN -> Client: data for block X + // 3. Client -> DN: Status OK (successful read) + // 4. Client -> DN: Read block Y + // The fact that step #3 and #4 are both in the client->DN direction + // triggers Nagling. If the DN is using delayed ACKs, this results + // in a delay of 40ms or more. + // + // TCP_NODELAY disables nagling and thus avoids this performance + // disaster. + socket.setTcpNoDelay(true); + SocketChannel channel = socket.getChannel(); + if (channel == null) { + peer = new BasicInetPeer(socket); + } else { + peer = new NioInetPeer(socket); + } + success = true; + return peer; + } finally { + if (!success) { + if (peer != null) peer.close(); + socket.close(); + } + } + } + + public static Peer peerFromSocketAndKey( + SaslDataTransferClient saslClient, Socket s, + DataEncryptionKeyFactory keyFactory, + Token blockToken, DatanodeID datanodeId) + throws IOException { + Peer peer = null; + boolean success = false; + try { + peer = peerFromSocket(s); + peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); + success = true; + return peer; + } finally { + if (!success) { + IOUtilsClient.cleanup(null, peer); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 01cd23a..253bd4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -130,6 +130,20 @@ public interface HdfsClientConfigKeys { long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri"; + String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = + "dfs.encrypt.data.transfer.cipher.suites"; + + String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection"; + String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = ""; + String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = + "dfs.data.transfer.saslproperties.resolver.class"; + + String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = + "dfs.encrypt.data.transfer.cipher.key.bitlength"; + int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128; + + String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class"; + String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY = PREFIX + "replica.accessor.builder.classes"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java new file mode 100644 index 0000000..212dbef --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.channels.ReadableByteChannel; + +import org.apache.hadoop.net.unix.DomainSocket; + +/** + * Represents a peer that we communicate with by using a basic Socket + * that has no associated Channel. + * + */ +public class BasicInetPeer implements Peer { + private final Socket socket; + private final OutputStream out; + private final InputStream in; + private final boolean isLocal; + + public BasicInetPeer(Socket socket) throws IOException { + this.socket = socket; + this.out = socket.getOutputStream(); + this.in = socket.getInputStream(); + this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress()); + } + + @Override + public ReadableByteChannel getInputStreamChannel() { + /* + * This Socket has no channel, so there's nothing to return here. + */ + return null; + } + + @Override + public void setReadTimeout(int timeoutMs) throws IOException { + socket.setSoTimeout(timeoutMs); + } + + @Override + public int getReceiveBufferSize() throws IOException { + return socket.getReceiveBufferSize(); + } + + @Override + public boolean getTcpNoDelay() throws IOException { + return socket.getTcpNoDelay(); + } + + @Override + public void setWriteTimeout(int timeoutMs) { + /* + * We can't implement write timeouts. :( + * + * Java provides no facility to set a blocking write timeout on a Socket. + * You can simulate a blocking write with a timeout by using + * non-blocking I/O. However, we can't use nio here, because this Socket + * doesn't have an associated Channel. + * + * See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4031100 for + * more details. + */ + } + + @Override + public boolean isClosed() { + return socket.isClosed(); + } + + @Override + public void close() throws IOException { + socket.close(); + } + + @Override + public String getRemoteAddressString() { + return socket.getRemoteSocketAddress().toString(); + } + + @Override + public String getLocalAddressString() { + return socket.getLocalSocketAddress().toString(); + } + + @Override + public InputStream getInputStream() throws IOException { + return in; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public String toString() { + return "BasicInetPeer(" + socket.toString() + ")"; + } + + @Override + public DomainSocket getDomainSocket() { + return null; + } + + @Override + public boolean hasSecureChannel() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java new file mode 100644 index 0000000..da660c7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.net.unix.DomainSocket; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ReadableByteChannel; + +/** + * Represents a peer that we communicate with by using an encrypted + * communications medium. + */ +@InterfaceAudience.Private +public class EncryptedPeer implements Peer { + private final Peer enclosedPeer; + + /** + * An encrypted InputStream. + */ + private final InputStream in; + + /** + * An encrypted OutputStream. + */ + private final OutputStream out; + + /** + * An encrypted ReadableByteChannel. + */ + private final ReadableByteChannel channel; + + public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) { + this.enclosedPeer = enclosedPeer; + this.in = ios.in; + this.out = ios.out; + this.channel = ios.in instanceof ReadableByteChannel ? + (ReadableByteChannel)ios.in : null; + } + + @Override + public ReadableByteChannel getInputStreamChannel() { + return channel; + } + + @Override + public void setReadTimeout(int timeoutMs) throws IOException { + enclosedPeer.setReadTimeout(timeoutMs); + } + + @Override + public int getReceiveBufferSize() throws IOException { + return enclosedPeer.getReceiveBufferSize(); + } + + @Override + public boolean getTcpNoDelay() throws IOException { + return enclosedPeer.getTcpNoDelay(); + } + + @Override + public void setWriteTimeout(int timeoutMs) throws IOException { + enclosedPeer.setWriteTimeout(timeoutMs); + } + + @Override + public boolean isClosed() { + return enclosedPeer.isClosed(); + } + + @Override + public void close() throws IOException { + try { + in.close(); + } finally { + try { + out.close(); + } finally { + enclosedPeer.close(); + } + } + } + + @Override + public String getRemoteAddressString() { + return enclosedPeer.getRemoteAddressString(); + } + + @Override + public String getLocalAddressString() { + return enclosedPeer.getLocalAddressString(); + } + + @Override + public InputStream getInputStream() throws IOException { + return in; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public boolean isLocal() { + return enclosedPeer.isLocal(); + } + + @Override + public String toString() { + return "EncryptedPeer(" + enclosedPeer + ")"; + } + + @Override + public DomainSocket getDomainSocket() { + return enclosedPeer.getDomainSocket(); + } + + @Override + public boolean hasSecureChannel() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java new file mode 100644 index 0000000..a12a69b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.channels.ReadableByteChannel; + +import org.apache.hadoop.net.SocketInputStream; +import org.apache.hadoop.net.SocketOutputStream; +import org.apache.hadoop.net.unix.DomainSocket; + +/** + * Represents a peer that we communicate with by using non-blocking I/O + * on a Socket. + */ +public class NioInetPeer implements Peer { + private final Socket socket; + + /** + * An InputStream which simulates blocking I/O with timeouts using NIO. + */ + private final SocketInputStream in; + + /** + * An OutputStream which simulates blocking I/O with timeouts using NIO. + */ + private final SocketOutputStream out; + + private final boolean isLocal; + + public NioInetPeer(Socket socket) throws IOException { + this.socket = socket; + this.in = new SocketInputStream(socket.getChannel(), 0); + this.out = new SocketOutputStream(socket.getChannel(), 0); + this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress()); + } + + @Override + public ReadableByteChannel getInputStreamChannel() { + return in; + } + + @Override + public void setReadTimeout(int timeoutMs) throws IOException { + in.setTimeout(timeoutMs); + } + + @Override + public int getReceiveBufferSize() throws IOException { + return socket.getReceiveBufferSize(); + } + + @Override + public boolean getTcpNoDelay() throws IOException { + return socket.getTcpNoDelay(); + } + + @Override + public void setWriteTimeout(int timeoutMs) throws IOException { + out.setTimeout(timeoutMs); + } + + @Override + public boolean isClosed() { + return socket.isClosed(); + } + + @Override + public void close() throws IOException { + // We always close the outermost streams-- in this case, 'in' and 'out' + // Closing either one of these will also close the Socket. + try { + in.close(); + } finally { + out.close(); + } + } + + @Override + public String getRemoteAddressString() { + return socket.getRemoteSocketAddress().toString(); + } + + @Override + public String getLocalAddressString() { + return socket.getLocalSocketAddress().toString(); + } + + @Override + public InputStream getInputStream() throws IOException { + return in; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public String toString() { + return "NioInetPeer(" + socket.toString() + ")"; + } + + @Override + public DomainSocket getDomainSocket() { + return null; + } + + @Override + public boolean hasSecureChannel() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java index 42cf287..3c38d5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java @@ -57,8 +57,8 @@ public interface Peer extends Closeable { * Set the write timeout on this peer. * * Note: this is not honored for BasicInetPeer. - * See {@link BasicSocketPeer#setWriteTimeout} for details. - * + * See {@link BasicInetPeer#setWriteTimeout} for details. + * * @param timeoutMs The timeout in milliseconds. */ public void setWriteTimeout(int timeoutMs) throws IOException; @@ -76,13 +76,13 @@ public interface Peer extends Closeable { public void close() throws IOException; /** - * @return A string representing the remote end of our + * @return A string representing the remote end of our * connection to the peer. */ public String getRemoteAddressString(); /** - * @return A string representing the local end of our + * @return A string representing the local end of our * connection to the peer. */ public String getLocalAddressString(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java new file mode 100644 index 0000000..23407f8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A little struct class to wrap an InputStream and an OutputStream. + */ +@InterfaceAudience.Private +public class IOStreamPair { + public final InputStream in; + public final OutputStream out; + + public IOStreamPair(InputStream in, OutputStream out) { + this.in = in; + this.out = out; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java new file mode 100644 index 0000000..3846f4a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.net.InetAddress; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Class used to indicate whether a channel is trusted or not. + * The default implementation is to return false indicating that + * the channel is not trusted. + * This class can be overridden to provide custom logic to determine + * whether a channel is trusted or not. + * The custom class can be specified via configuration. + * + */ +public class TrustedChannelResolver implements Configurable { + Configuration conf; + + /** + * Returns an instance of TrustedChannelResolver. + * Looks up the configuration to see if there is custom class specified. + * @param conf + * @return TrustedChannelResolver + */ + public static TrustedChannelResolver getInstance(Configuration conf) { + Class clazz = + conf.getClass( + HdfsClientConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS, + TrustedChannelResolver.class, TrustedChannelResolver.class); + return ReflectionUtils.newInstance(clazz, conf); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * Return boolean value indicating whether a channel is trusted or not + * from a client's perspective. + * @return true if the channel is trusted and false otherwise. + */ + public boolean isTrusted() { + return false; + } + + + /** + * Identify boolean value indicating whether a channel is trusted or not. + * @param peerAddress address of the peer + * @return true if the channel is trusted and false otherwise. + */ + public boolean isTrusted(InetAddress peerAddress) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java new file mode 100644 index 0000000..959cba0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; + +/** + * Creates a new {@link DataEncryptionKey} on demand. + */ +@InterfaceAudience.Private +public interface DataEncryptionKeyFactory { + + /** + * Creates a new DataEncryptionKey. + * + * @return DataEncryptionKey newly created + * @throws IOException for any error + */ + DataEncryptionKey newDataEncryptionKey() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java new file mode 100644 index 0000000..256caff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java @@ -0,0 +1,519 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; +import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.security.sasl.Sasl; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CipherOption; +import org.apache.hadoop.crypto.CipherSuite; +import org.apache.hadoop.crypto.CryptoCodec; +import org.apache.hadoop.crypto.CryptoInputStream; +import org.apache.hadoop.crypto.CryptoOutputStream; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.net.InetAddresses; +import com.google.protobuf.ByteString; + +/** + * Utility methods implementing SASL negotiation for DataTransferProtocol. + */ +@InterfaceAudience.Private +public final class DataTransferSaslUtil { + + private static final Logger LOG = LoggerFactory.getLogger( + DataTransferSaslUtil.class); + + /** + * Delimiter for the three-part SASL username string. + */ + public static final String NAME_DELIMITER = " "; + + /** + * Sent by clients and validated by servers. We use a number that's unlikely + * to ever be sent as the value of the DATA_TRANSFER_VERSION. + */ + public static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; + + /** + * Checks that SASL negotiation has completed for the given participant, and + * the negotiated quality of protection is included in the given SASL + * properties and therefore acceptable. + * + * @param sasl participant to check + * @param saslProps properties of SASL negotiation + * @throws IOException for any error + */ + public static void checkSaslComplete(SaslParticipant sasl, + Map saslProps) throws IOException { + if (!sasl.isComplete()) { + throw new IOException("Failed to complete SASL handshake"); + } + Set requestedQop = ImmutableSet.copyOf(Arrays.asList( + saslProps.get(Sasl.QOP).split(","))); + String negotiatedQop = sasl.getNegotiatedQop(); + LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}", + requestedQop, negotiatedQop); + if (!requestedQop.contains(negotiatedQop)) { + throw new IOException(String.format("SASL handshake completed, but " + + "channel does not have acceptable quality of protection, " + + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); + } + } + + /** + * Check whether requested SASL Qop contains privacy. + * + * @param saslProps properties of SASL negotiation + * @return boolean true if privacy exists + */ + public static boolean requestedQopContainsPrivacy( + Map saslProps) { + Set requestedQop = ImmutableSet.copyOf(Arrays.asList( + saslProps.get(Sasl.QOP).split(","))); + return requestedQop.contains("auth-conf"); + } + + /** + * Creates SASL properties required for an encrypted SASL negotiation. + * + * @param encryptionAlgorithm to use for SASL negotation + * @return properties of encrypted SASL negotiation + */ + public static Map createSaslPropertiesForEncryption( + String encryptionAlgorithm) { + Map saslProps = Maps.newHashMapWithExpectedSize(3); + saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); + saslProps.put(Sasl.SERVER_AUTH, "true"); + saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); + return saslProps; + } + + /** + * For an encrypted SASL negotiation, encodes an encryption key to a SASL + * password. + * + * @param encryptionKey to encode + * @return key encoded as SASL password + */ + public static char[] encryptionKeyToPassword(byte[] encryptionKey) { + return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8) + .toCharArray(); + } + + /** + * Returns InetAddress from peer. The getRemoteAddressString has the form + * [host][/ip-address]:port. The host may be missing. The IP address (and + * preceding '/') may be missing. The port preceded by ':' is always present. + * + * @param peer + * @return InetAddress from peer + */ + public static InetAddress getPeerAddress(Peer peer) { + String remoteAddr = peer.getRemoteAddressString().split(":")[0]; + int slashIdx = remoteAddr.indexOf('/'); + return InetAddresses.forString(slashIdx != -1 ? + remoteAddr.substring(slashIdx + 1, remoteAddr.length()) : + remoteAddr); + } + + /** + * Creates a SaslPropertiesResolver from the given configuration. This method + * works by cloning the configuration, translating configuration properties + * specific to DataTransferProtocol to what SaslPropertiesResolver expects, + * and then delegating to SaslPropertiesResolver for initialization. This + * method returns null if SASL protection has not been configured for + * DataTransferProtocol. + * + * @param conf configuration to read + * @return SaslPropertiesResolver for DataTransferProtocol, or null if not + * configured + */ + public static SaslPropertiesResolver getSaslPropertiesResolver( + Configuration conf) { + String qops = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY); + if (qops == null || qops.isEmpty()) { + LOG.debug("DataTransferProtocol not using SaslPropertiesResolver, no " + + "QOP found in configuration for {}", DFS_DATA_TRANSFER_PROTECTION_KEY); + return null; + } + Configuration saslPropsResolverConf = new Configuration(conf); + saslPropsResolverConf.set(HADOOP_RPC_PROTECTION, qops); + Class resolverClass = conf.getClass( + HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, + SaslPropertiesResolver.class, SaslPropertiesResolver.class); + resolverClass = conf.getClass(DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, + resolverClass, SaslPropertiesResolver.class); + saslPropsResolverConf.setClass(HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, + resolverClass, SaslPropertiesResolver.class); + SaslPropertiesResolver resolver = SaslPropertiesResolver.getInstance( + saslPropsResolverConf); + LOG.debug("DataTransferProtocol using SaslPropertiesResolver, configured " + + "QOP {} = {}, configured class {} = {}", DFS_DATA_TRANSFER_PROTECTION_KEY, qops, + DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass); + return resolver; + } + + /** + * Reads a SASL negotiation message. + * + * @param in stream to read + * @return bytes of SASL negotiation messsage + * @throws IOException for any error + */ + public static byte[] readSaslMessage(InputStream in) throws IOException { + DataTransferEncryptorMessageProto proto = + DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } else { + return proto.getPayload().toByteArray(); + } + } + + /** + * Reads a SASL negotiation message and negotiation cipher options. + * + * @param in stream to read + * @param cipherOptions list to store negotiation cipher options + * @return byte[] SASL negotiation message + * @throws IOException for any error + */ + public static byte[] readSaslMessageAndNegotiationCipherOptions( + InputStream in, List cipherOptions) throws IOException { + DataTransferEncryptorMessageProto proto = + DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } else { + List optionProtos = proto.getCipherOptionList(); + if (optionProtos != null) { + for (CipherOptionProto optionProto : optionProtos) { + cipherOptions.add(PBHelperClient.convert(optionProto)); + } + } + return proto.getPayload().toByteArray(); + } + } + + /** + * Negotiate a cipher option which server supports. + * + * @param conf the configuration + * @param options the cipher options which client supports + * @return CipherOption negotiated cipher option + */ + public static CipherOption negotiateCipherOption(Configuration conf, + List options) throws IOException { + // Negotiate cipher suites if configured. Currently, the only supported + // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple + // values for future expansion. + String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); + if (cipherSuites == null || cipherSuites.isEmpty()) { + return null; + } + if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { + throw new IOException(String.format("Invalid cipher suite, %s=%s", + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); + } + if (options != null) { + for (CipherOption option : options) { + CipherSuite suite = option.getCipherSuite(); + if (suite == CipherSuite.AES_CTR_NOPADDING) { + int keyLen = conf.getInt( + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY, + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT) / 8; + CryptoCodec codec = CryptoCodec.getInstance(conf, suite); + byte[] inKey = new byte[keyLen]; + byte[] inIv = new byte[suite.getAlgorithmBlockSize()]; + byte[] outKey = new byte[keyLen]; + byte[] outIv = new byte[suite.getAlgorithmBlockSize()]; + codec.generateSecureRandom(inKey); + codec.generateSecureRandom(inIv); + codec.generateSecureRandom(outKey); + codec.generateSecureRandom(outIv); + return new CipherOption(suite, inKey, inIv, outKey, outIv); + } + } + } + return null; + } + + /** + * Send SASL message and negotiated cipher option to client. + * + * @param out stream to receive message + * @param payload to send + * @param option negotiated cipher option + * @throws IOException for any error + */ + public static void sendSaslMessageAndNegotiatedCipherOption( + OutputStream out, byte[] payload, CipherOption option) + throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); + + builder.setStatus(DataTransferEncryptorStatus.SUCCESS); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (option != null) { + builder.addCipherOption(PBHelperClient.convert(option)); + } + + DataTransferEncryptorMessageProto proto = builder.build(); + proto.writeDelimitedTo(out); + out.flush(); + } + + /** + * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream} + * and {@link org.apache.hadoop.crypto.CryptoOutputStream} + * + * @param conf the configuration + * @param cipherOption negotiated cipher option + * @param out underlying output stream + * @param in underlying input stream + * @param isServer is server side + * @return IOStreamPair the stream pair + * @throws IOException for any error + */ + public static IOStreamPair createStreamPair(Configuration conf, + CipherOption cipherOption, OutputStream out, InputStream in, + boolean isServer) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating IOStreamPair of CryptoInputStream and " + + "CryptoOutputStream."); + } + CryptoCodec codec = CryptoCodec.getInstance(conf, + cipherOption.getCipherSuite()); + byte[] inKey = cipherOption.getInKey(); + byte[] inIv = cipherOption.getInIv(); + byte[] outKey = cipherOption.getOutKey(); + byte[] outIv = cipherOption.getOutIv(); + InputStream cIn = new CryptoInputStream(in, codec, + isServer ? inKey : outKey, isServer ? inIv : outIv); + OutputStream cOut = new CryptoOutputStream(out, codec, + isServer ? outKey : inKey, isServer ? outIv : inIv); + return new IOStreamPair(cIn, cOut); + } + + /** + * Sends a SASL negotiation message indicating an error. + * + * @param out stream to receive message + * @param message to send + * @throws IOException for any error + */ + public static void sendGenericSaslErrorMessage(OutputStream out, + String message) throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message); + } + + /** + * Sends a SASL negotiation message. + * + * @param out stream to receive message + * @param payload to send + * @throws IOException for any error + */ + public static void sendSaslMessage(OutputStream out, byte[] payload) + throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null); + } + + /** + * Send a SASL negotiation message and negotiation cipher options to server. + * + * @param out stream to receive message + * @param payload to send + * @param options cipher options to negotiate + * @throws IOException for any error + */ + public static void sendSaslMessageAndNegotiationCipherOptions( + OutputStream out, byte[] payload, List options) + throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); + + builder.setStatus(DataTransferEncryptorStatus.SUCCESS); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (options != null) { + builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options)); + } + + DataTransferEncryptorMessageProto proto = builder.build(); + proto.writeDelimitedTo(out); + out.flush(); + } + + /** + * Read SASL message and negotiated cipher option from server. + * + * @param in stream to read + * @return SaslResponseWithNegotiatedCipherOption SASL message and + * negotiated cipher option + * @throws IOException for any error + */ + public static SaslResponseWithNegotiatedCipherOption + readSaslMessageAndNegotiatedCipherOption(InputStream in) + throws IOException { + DataTransferEncryptorMessageProto proto = + DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } else { + byte[] response = proto.getPayload().toByteArray(); + List options = PBHelperClient.convertCipherOptionProtos( + proto.getCipherOptionList()); + CipherOption option = null; + if (options != null && !options.isEmpty()) { + option = options.get(0); + } + return new SaslResponseWithNegotiatedCipherOption(response, option); + } + } + + /** + * Encrypt the key and iv of the negotiated cipher option. + * + * @param option negotiated cipher option + * @param sasl SASL participant representing server + * @return CipherOption negotiated cipher option which contains the + * encrypted key and iv + * @throws IOException for any error + */ + public static CipherOption wrap(CipherOption option, SaslParticipant sasl) + throws IOException { + if (option != null) { + byte[] inKey = option.getInKey(); + if (inKey != null) { + inKey = sasl.wrap(inKey, 0, inKey.length); + } + byte[] outKey = option.getOutKey(); + if (outKey != null) { + outKey = sasl.wrap(outKey, 0, outKey.length); + } + return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), + outKey, option.getOutIv()); + } + + return null; + } + + /** + * Decrypt the key and iv of the negotiated cipher option. + * + * @param option negotiated cipher option + * @param sasl SASL participant representing client + * @return CipherOption negotiated cipher option which contains the + * decrypted key and iv + * @throws IOException for any error + */ + public static CipherOption unwrap(CipherOption option, SaslParticipant sasl) + throws IOException { + if (option != null) { + byte[] inKey = option.getInKey(); + if (inKey != null) { + inKey = sasl.unwrap(inKey, 0, inKey.length); + } + byte[] outKey = option.getOutKey(); + if (outKey != null) { + outKey = sasl.unwrap(outKey, 0, outKey.length); + } + return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), + outKey, option.getOutIv()); + } + + return null; + } + + /** + * Sends a SASL negotiation message. + * + * @param out stream to receive message + * @param status negotiation status + * @param payload to send + * @param message to send + * @throws IOException for any error + */ + public static void sendSaslMessage(OutputStream out, + DataTransferEncryptorStatus status, byte[] payload, String message) + throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); + + builder.setStatus(status); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (message != null) { + builder.setMessage(message); + } + + DataTransferEncryptorMessageProto proto = builder.build(); + proto.writeDelimitedTo(out); + out.flush(); + } + + /** + * There is no reason to instantiate this class. + */ + private DataTransferSaslUtil() { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java new file mode 100644 index 0000000..913203c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java @@ -0,0 +1,498 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; +import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CipherOption; +import org.apache.hadoop.crypto.CipherSuite; +import org.apache.hadoop.hdfs.net.EncryptedPeer; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; + +/** + * Negotiates SASL for DataTransferProtocol on behalf of a client. There are + * two possible supported variants of SASL negotiation: either a general-purpose + * negotiation supporting any quality of protection, or a specialized + * negotiation that enforces privacy as the quality of protection using a + * cryptographically strong encryption key. + * + * This class is used in both the HDFS client and the DataNode. The DataNode + * needs it, because it acts as a client to other DataNodes during write + * pipelines and block transfers. + */ +@InterfaceAudience.Private +public class SaslDataTransferClient { + + private static final Logger LOG = LoggerFactory.getLogger( + SaslDataTransferClient.class); + + private final Configuration conf; + private final AtomicBoolean fallbackToSimpleAuth; + private final SaslPropertiesResolver saslPropsResolver; + private final TrustedChannelResolver trustedChannelResolver; + + /** + * Creates a new SaslDataTransferClient. This constructor is used in cases + * where it is not relevant to track if a secure client did a fallback to + * simple auth. For intra-cluster connections between data nodes in the same + * cluster, we can assume that all run under the same security configuration. + * + * @param conf the configuration + * @param saslPropsResolver for determining properties of SASL negotiation + * @param trustedChannelResolver for identifying trusted connections that do + * not require SASL negotiation + */ + public SaslDataTransferClient(Configuration conf, + SaslPropertiesResolver saslPropsResolver, + TrustedChannelResolver trustedChannelResolver) { + this(conf, saslPropsResolver, trustedChannelResolver, null); + } + + /** + * Creates a new SaslDataTransferClient. + * + * @param conf the configuration + * @param saslPropsResolver for determining properties of SASL negotiation + * @param trustedChannelResolver for identifying trusted connections that do + * not require SASL negotiation + * @param fallbackToSimpleAuth checked on each attempt at general SASL + * handshake, if true forces use of simple auth + */ + public SaslDataTransferClient(Configuration conf, + SaslPropertiesResolver saslPropsResolver, + TrustedChannelResolver trustedChannelResolver, + AtomicBoolean fallbackToSimpleAuth) { + this.conf = conf; + this.fallbackToSimpleAuth = fallbackToSimpleAuth; + this.saslPropsResolver = saslPropsResolver; + this.trustedChannelResolver = trustedChannelResolver; + } + + /** + * Sends client SASL negotiation for a newly allocated socket if required. + * + * @param socket connection socket + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param encryptionKeyFactory for creation of an encryption key + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut, + InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory, + Token accessToken, DatanodeID datanodeId) + throws IOException { + // The encryption key factory only returns a key if encryption is enabled. + DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ? + encryptionKeyFactory.newDataEncryptionKey() : null; + IOStreamPair ios = send(socket.getInetAddress(), underlyingOut, + underlyingIn, encryptionKey, accessToken, datanodeId); + return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut); + } + + /** + * Sends client SASL negotiation for a peer if required. + * + * @param peer connection peer + * @param encryptionKeyFactory for creation of an encryption key + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + public Peer peerSend(Peer peer, DataEncryptionKeyFactory encryptionKeyFactory, + Token accessToken, DatanodeID datanodeId) + throws IOException { + IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer), + peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory, + accessToken, datanodeId); + // TODO: Consider renaming EncryptedPeer to SaslPeer. + return ios != null ? new EncryptedPeer(peer, ios) : peer; + } + + /** + * Sends client SASL negotiation for a socket if required. + * + * @param socket connection socket + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param encryptionKeyFactory for creation of an encryption key + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut, + InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory, + Token accessToken, DatanodeID datanodeId) + throws IOException { + IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut, + underlyingIn, encryptionKeyFactory, accessToken, datanodeId); + return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut); + } + + /** + * Checks if an address is already trusted and then sends client SASL + * negotiation if required. + * + * @param addr connection address + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param encryptionKeyFactory for creation of an encryption key + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair checkTrustAndSend(InetAddress addr, + OutputStream underlyingOut, InputStream underlyingIn, + DataEncryptionKeyFactory encryptionKeyFactory, + Token accessToken, DatanodeID datanodeId) + throws IOException { + if (!trustedChannelResolver.isTrusted() && + !trustedChannelResolver.isTrusted(addr)) { + // The encryption key factory only returns a key if encryption is enabled. + DataEncryptionKey encryptionKey = + encryptionKeyFactory.newDataEncryptionKey(); + return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken, + datanodeId); + } else { + LOG.debug( + "SASL client skipping handshake on trusted connection for addr = {}, " + + "datanodeId = {}", addr, datanodeId); + return null; + } + } + + /** + * Sends client SASL negotiation if required. Determines the correct type of + * SASL handshake based on configuration. + * + * @param addr connection address + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param encryptionKey for an encrypted SASL handshake + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair send(InetAddress addr, OutputStream underlyingOut, + InputStream underlyingIn, DataEncryptionKey encryptionKey, + Token accessToken, DatanodeID datanodeId) + throws IOException { + if (encryptionKey != null) { + LOG.debug( + "SASL client doing encrypted handshake for addr = {}, datanodeId = {}", + addr, datanodeId); + return getEncryptedStreams(underlyingOut, underlyingIn, + encryptionKey); + } else if (!UserGroupInformation.isSecurityEnabled()) { + LOG.debug( + "SASL client skipping handshake in unsecured configuration for " + + "addr = {}, datanodeId = {}", addr, datanodeId); + return null; + } else if (SecurityUtil.isPrivilegedPort(datanodeId.getXferPort())) { + LOG.debug( + "SASL client skipping handshake in secured configuration with " + + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId); + return null; + } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { + LOG.debug( + "SASL client skipping handshake in secured configuration with " + + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId); + return null; + } else if (saslPropsResolver != null) { + LOG.debug( + "SASL client doing general handshake for addr = {}, datanodeId = {}", + addr, datanodeId); + return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken, + datanodeId); + } else { + // It's a secured cluster using non-privileged ports, but no SASL. The + // only way this can happen is if the DataNode has + // ignore.secure.ports.for.testing configured, so this is a rare edge case. + LOG.debug( + "SASL client skipping handshake in secured configuration with no SASL " + + "protection configured for addr = {}, datanodeId = {}", + addr, datanodeId); + return null; + } + } + + /** + * Sends client SASL negotiation for specialized encrypted handshake. + * + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param encryptionKey for an encrypted SASL handshake + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair getEncryptedStreams(OutputStream underlyingOut, + InputStream underlyingIn, DataEncryptionKey encryptionKey) + throws IOException { + Map saslProps = createSaslPropertiesForEncryption( + encryptionKey.encryptionAlgorithm); + + LOG.debug("Client using encryption algorithm {}", + encryptionKey.encryptionAlgorithm); + + String userName = getUserNameFromEncryptionKey(encryptionKey); + char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey); + CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, + password); + return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, + callbackHandler); + } + + /** + * The SASL username for an encrypted handshake consists of the keyId, + * blockPoolId, and nonce with the first two encoded as Strings, and the third + * encoded using Base64. The fields are each separated by a single space. + * + * @param encryptionKey the encryption key to encode as a SASL username. + * @return encoded username containing keyId, blockPoolId, and nonce + */ + private static String getUserNameFromEncryptionKey( + DataEncryptionKey encryptionKey) { + return encryptionKey.keyId + NAME_DELIMITER + + encryptionKey.blockPoolId + NAME_DELIMITER + + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); + } + + /** + * Sets user name and password when asked by the client-side SASL object. + */ + private static final class SaslClientCallbackHandler + implements CallbackHandler { + + private final char[] password; + private final String userName; + + /** + * Creates a new SaslClientCallbackHandler. + * + * @param userName SASL user name + * @Param password SASL password + */ + public SaslClientCallbackHandler(String userName, char[] password) { + this.password = password; + this.userName = userName; + } + + @Override + public void handle(Callback[] callbacks) throws IOException, + UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof RealmChoiceCallback) { + continue; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL client callback"); + } + } + if (nc != null) { + nc.setName(userName); + } + if (pc != null) { + pc.setPassword(password); + } + if (rc != null) { + rc.setText(rc.getDefaultText()); + } + } + } + + /** + * Sends client SASL negotiation for general-purpose handshake. + * + * @param addr connection address + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param accessToken connection block access token + * @param datanodeId ID of destination DataNode + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair getSaslStreams(InetAddress addr, + OutputStream underlyingOut, InputStream underlyingIn, + Token accessToken, DatanodeID datanodeId) + throws IOException { + Map saslProps = saslPropsResolver.getClientProperties(addr); + + String userName = buildUserName(accessToken); + char[] password = buildClientPassword(accessToken); + CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, + password); + return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, + callbackHandler); + } + + /** + * Builds the client's user name for the general-purpose handshake, consisting + * of the base64-encoded serialized block access token identifier. Note that + * this includes only the token identifier, not the token itself, which would + * include the password. The password is a shared secret, and we must not + * write it on the network during the SASL authentication exchange. + * + * @param blockToken for block access + * @return SASL user name + */ + private static String buildUserName(Token blockToken) { + return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), + Charsets.UTF_8); + } + + /** + * Calculates the password on the client side for the general-purpose + * handshake. The password consists of the block access token's password. + * + * @param blockToken for block access + * @return SASL password + */ + private char[] buildClientPassword(Token blockToken) { + return new String(Base64.encodeBase64(blockToken.getPassword(), false), + Charsets.UTF_8).toCharArray(); + } + + /** + * This method actually executes the client-side SASL handshake. + * + * @param underlyingOut connection output stream + * @param underlyingIn connection input stream + * @param userName SASL user name + * @param saslProps properties of SASL negotiation + * @param callbackHandler for responding to SASL callbacks + * @return new pair of streams, wrapped after SASL negotiation + * @throws IOException for any error + */ + private IOStreamPair doSaslHandshake(OutputStream underlyingOut, + InputStream underlyingIn, String userName, Map saslProps, + CallbackHandler callbackHandler) throws IOException { + + DataOutputStream out = new DataOutputStream(underlyingOut); + DataInputStream in = new DataInputStream(underlyingIn); + + SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName, + saslProps, callbackHandler); + + out.writeInt(SASL_TRANSFER_MAGIC_NUMBER); + out.flush(); + + try { + // Start of handshake - "initial response" in SASL terminology. + sendSaslMessage(out, new byte[0]); + + // step 1 + byte[] remoteResponse = readSaslMessage(in); + byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + List cipherOptions = null; + if (requestedQopContainsPrivacy(saslProps)) { + // Negotiate cipher suites if configured. Currently, the only supported + // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple + // values for future expansion. + String cipherSuites = conf.get( + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); + if (cipherSuites != null && !cipherSuites.isEmpty()) { + if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { + throw new IOException(String.format("Invalid cipher suite, %s=%s", + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); + } + CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING); + cipherOptions = Lists.newArrayListWithCapacity(1); + cipherOptions.add(option); + } + } + sendSaslMessageAndNegotiationCipherOptions(out, localResponse, + cipherOptions); + + // step 2 (client-side only) + SaslResponseWithNegotiatedCipherOption response = + readSaslMessageAndNegotiatedCipherOption(in); + localResponse = sasl.evaluateChallengeOrResponse(response.payload); + assert localResponse == null; + + // SASL handshake is complete + checkSaslComplete(sasl, saslProps); + + CipherOption cipherOption = null; + if (sasl.isNegotiatedQopPrivacy()) { + // Unwrap the negotiated cipher option + cipherOption = unwrap(response.cipherOption, sasl); + } + + // If negotiated cipher option is not null, we will use it to create + // stream pair. + return cipherOption != null ? createStreamPair( + conf, cipherOption, underlyingOut, underlyingIn, false) : + sasl.createStreamPair(out, in); + } catch (IOException ioe) { + sendGenericSaslErrorMessage(out, ioe.getMessage()); + throw ioe; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java new file mode 100644 index 0000000..f14a075 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.util.Map; +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.security.SaslInputStream; +import org.apache.hadoop.security.SaslOutputStream; + +/** + * Strongly inspired by Thrift's TSaslTransport class. + * + * Used to abstract over the SaslServer and + * SaslClient classes, which share a lot of their interface, but + * unfortunately don't share a common superclass. + */ +@InterfaceAudience.Private +class SaslParticipant { + + // This has to be set as part of the SASL spec, but it don't matter for + // our purposes, but may not be empty. It's sent over the wire, so use + // a short string. + private static final String SERVER_NAME = "0"; + private static final String PROTOCOL = "hdfs"; + private static final String MECHANISM = "DIGEST-MD5"; + + // One of these will always be null. + private final SaslServer saslServer; + private final SaslClient saslClient; + + /** + * Creates a SaslParticipant wrapping a SaslServer. + * + * @param saslProps properties of SASL negotiation + * @param callbackHandler for handling all SASL callbacks + * @return SaslParticipant wrapping SaslServer + * @throws SaslException for any error + */ + public static SaslParticipant createServerSaslParticipant( + Map saslProps, CallbackHandler callbackHandler) + throws SaslException { + return new SaslParticipant(Sasl.createSaslServer(MECHANISM, + PROTOCOL, SERVER_NAME, saslProps, callbackHandler)); + } + + /** + * Creates a SaslParticipant wrapping a SaslClient. + * + * @param userName SASL user name + * @param saslProps properties of SASL negotiation + * @param callbackHandler for handling all SASL callbacks + * @return SaslParticipant wrapping SaslClient + * @throws SaslException for any error + */ + public static SaslParticipant createClientSaslParticipant(String userName, + Map saslProps, CallbackHandler callbackHandler) + throws SaslException { + return new SaslParticipant(Sasl.createSaslClient(new String[] { MECHANISM }, + userName, PROTOCOL, SERVER_NAME, saslProps, callbackHandler)); + } + + /** + * Private constructor wrapping a SaslServer. + * + * @param saslServer to wrap + */ + private SaslParticipant(SaslServer saslServer) { + this.saslServer = saslServer; + this.saslClient = null; + } + + /** + * Private constructor wrapping a SaslClient. + * + * @param saslClient to wrap + */ + private SaslParticipant(SaslClient saslClient) { + this.saslServer = null; + this.saslClient = saslClient; + } + + /** + * @see {@link SaslServer#evaluateResponse} + * @see {@link SaslClient#evaluateChallenge} + */ + public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) + throws SaslException { + if (saslClient != null) { + return saslClient.evaluateChallenge(challengeOrResponse); + } else { + return saslServer.evaluateResponse(challengeOrResponse); + } + } + + /** + * After successful SASL negotation, returns the negotiated quality of + * protection. + * + * @return negotiated quality of protection + */ + public String getNegotiatedQop() { + if (saslClient != null) { + return (String) saslClient.getNegotiatedProperty(Sasl.QOP); + } else { + return (String) saslServer.getNegotiatedProperty(Sasl.QOP); + } + } + + /** + * After successful SASL negotiation, returns whether it's QOP privacy + * + * @return boolean whether it's QOP privacy + */ + public boolean isNegotiatedQopPrivacy() { + String qop = getNegotiatedQop(); + return qop != null && "auth-conf".equalsIgnoreCase(qop); + } + + /** + * Wraps a byte array. + * + * @param bytes The array containing the bytes to wrap. + * @param off The starting position at the array + * @param len The number of bytes to wrap + * @return byte[] wrapped bytes + * @throws SaslException if the bytes cannot be successfully wrapped + */ + public byte[] wrap(byte[] bytes, int off, int len) throws SaslException { + if (saslClient != null) { + return saslClient.wrap(bytes, off, len); + } else { + return saslServer.wrap(bytes, off, len); + } + } + + /** + * Unwraps a byte array. + * + * @param bytes The array containing the bytes to unwrap. + * @param off The starting position at the array + * @param len The number of bytes to unwrap + * @return byte[] unwrapped bytes + * @throws SaslException if the bytes cannot be successfully unwrapped + */ + public byte[] unwrap(byte[] bytes, int off, int len) throws SaslException { + if (saslClient != null) { + return saslClient.unwrap(bytes, off, len); + } else { + return saslServer.unwrap(bytes, off, len); + } + } + + /** + * Returns true if SASL negotiation is complete. + * + * @return true if SASL negotiation is complete + */ + public boolean isComplete() { + if (saslClient != null) { + return saslClient.isComplete(); + } else { + return saslServer.isComplete(); + } + } + + /** + * Return some input/output streams that may henceforth have their + * communication encrypted, depending on the negotiated quality of protection. + * + * @param out output stream to wrap + * @param in input stream to wrap + * @return IOStreamPair wrapping the streams + */ + public IOStreamPair createStreamPair(DataOutputStream out, + DataInputStream in) { + if (saslClient != null) { + return new IOStreamPair( + new SaslInputStream(in, saslClient), + new SaslOutputStream(out, saslClient)); + } else { + return new IOStreamPair( + new SaslInputStream(in, saslServer), + new SaslOutputStream(out, saslServer)); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d99018d6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java new file mode 100644 index 0000000..f69441b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.crypto.CipherOption; + +@InterfaceAudience.Private +public class SaslResponseWithNegotiatedCipherOption { + final byte[] payload; + final CipherOption cipherOption; + + public SaslResponseWithNegotiatedCipherOption(byte[] payload, + CipherOption cipherOption) { + this.payload = payload; + this.cipherOption = cipherOption; + } +} \ No newline at end of file