Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C0095C25C for ; Tue, 7 Aug 2012 16:47:19 +0000 (UTC) Received: (qmail 7395 invoked by uid 500); 7 Aug 2012 16:47:19 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 7341 invoked by uid 500); 7 Aug 2012 16:47:19 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 7204 invoked by uid 99); 7 Aug 2012 16:47:19 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Aug 2012 16:47:19 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Aug 2012 16:47:09 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2697923889ED; Tue, 7 Aug 2012 16:46:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1370360 [1/3] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/ma... Date: Tue, 07 Aug 2012 16:46:11 -0000 To: hdfs-commits@hadoop.apache.org From: atm@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120807164614.2697923889ED@eris.apache.org> Author: atm Date: Tue Aug 7 16:46:03 2012 New Revision: 1370360 URL: http://svn.apache.org/viewvc?rev=1370360&view=rev Log: HDFS-3637. Add support for encrypting the DataTransferProtocol. Contributed by Aaron T. Myers. Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/DataEncryptionKey.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithEncryptedTransfer.java Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Aug 7 16:46:03 2012 @@ -21,6 +21,8 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3513. HttpFS should cache filesystems. (tucu) + HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm) + IMPROVEMENTS HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Tue Aug 7 16:46:03 2012 @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.Socket; import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; /** * A BlockReader is responsible for reading a single block @@ -71,4 +72,8 @@ public interface BlockReader extends Byt */ boolean hasSentStatusCode(); + /** + * @return a reference to the streams this block reader is using. + */ + IOStreamPair getStreams(); } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Tue Aug 7 16:46:03 2012 @@ -25,7 +25,12 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -41,12 +46,13 @@ public class BlockReaderFactory { Configuration conf, Socket sock, String file, ExtendedBlock block, Token blockToken, - long startOffset, long len) throws IOException { + long startOffset, long len, DataEncryptionKey encryptionKey) + throws IOException { int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT); return newBlockReader(new Conf(conf), sock, file, block, blockToken, startOffset, - len, bufferSize, true, ""); + len, bufferSize, true, "", encryptionKey, null); } /** @@ -73,14 +79,32 @@ public class BlockReaderFactory { Token blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, - String clientName) + String clientName, + DataEncryptionKey encryptionKey, + IOStreamPair ioStreams) throws IOException { + if (conf.useLegacyBlockReader) { + if (encryptionKey != null) { + throw new RuntimeException("Encryption is not supported with the legacy block reader."); + } return RemoteBlockReader.newBlockReader( sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName); } else { + if (ioStreams == null) { + ioStreams = new IOStreamPair(NetUtils.getInputStream(sock), + NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)); + if (encryptionKey != null) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + ioStreams.out, ioStreams.in, encryptionKey); + ioStreams = encryptedStreams; + } + } + return RemoteBlockReader2.newBlockReader( - sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName); + sock, file, block, blockToken, startOffset, len, bufferSize, + verifyChecksum, clientName, encryptionKey, ioStreams); } } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Tue Aug 7 16:46:03 2012 @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.B import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.util.DirectBufferPool; @@ -681,4 +682,9 @@ class BlockReaderLocal implements BlockR public boolean hasSentStatusCode() { return false; } + + @Override + public IOStreamPair getStreams() { + return null; + } } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Aug 7 16:46:03 2012 @@ -45,6 +45,8 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; @@ -53,6 +55,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -109,12 +112,15 @@ import org.apache.hadoop.hdfs.protocol.L import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -182,6 +188,7 @@ public class DFSClient implements java.i final Conf dfsClientConf; private Random r = new Random(); private SocketAddress[] localInterfaceAddrs; + private DataEncryptionKey encryptionKey; /** * DFSClient configuration @@ -351,9 +358,6 @@ public class DFSClient implements java.i this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); - this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); - - if (rpcNamenode != null) { // This case is used for testing. Preconditions.checkArgument(nameNodeUri == null); @@ -383,6 +387,8 @@ public class DFSClient implements java.i Joiner.on(',').join(localInterfaces)+ "] with addresses [" + Joiner.on(',').join(localInterfaceAddrs) + "]"); } + + this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); } /** @@ -1457,7 +1463,44 @@ public class DFSClient implements java.i */ public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { checkOpen(); - return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout); + return getFileChecksum(src, namenode, socketFactory, + dfsClientConf.socketTimeout, getDataEncryptionKey()); + } + + @InterfaceAudience.Private + public void clearDataEncryptionKey() { + LOG.debug("Clearing encryption key"); + synchronized (this) { + encryptionKey = null; + } + } + + /** + * @return true if data sent between this client and DNs should be encrypted, + * false otherwise. + * @throws IOException in the event of error communicating with the NN + */ + boolean shouldEncryptData() throws IOException { + FsServerDefaults d = getServerDefaults(); + return d == null ? false : d.getEncryptDataTransfer(); + } + + @InterfaceAudience.Private + public DataEncryptionKey getDataEncryptionKey() + throws IOException { + if (shouldEncryptData()) { + synchronized (this) { + if (encryptionKey == null || + (encryptionKey != null && + encryptionKey.expiryDate < Time.now())) { + LOG.debug("Getting new encryption token from NN"); + encryptionKey = namenode.getDataEncryptionKey(); + } + return encryptionKey; + } + } else { + return null; + } } /** @@ -1466,8 +1509,8 @@ public class DFSClient implements java.i * @return The checksum */ public static MD5MD5CRC32FileChecksum getFileChecksum(String src, - ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout - ) throws IOException { + ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, + DataEncryptionKey encryptionKey) throws IOException { //get all block locations LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); if (null == blockLocations) { @@ -1510,10 +1553,18 @@ public class DFSClient implements java.i timeout); sock.setSoTimeout(timeout); - out = new DataOutputStream( - new BufferedOutputStream(NetUtils.getOutputStream(sock), - HdfsConstants.SMALL_BUFFER_SIZE)); - in = new DataInputStream(NetUtils.getInputStream(sock)); + OutputStream unbufOut = NetUtils.getOutputStream(sock); + InputStream unbufIn = NetUtils.getInputStream(sock); + if (encryptionKey != null) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + unbufOut, unbufIn, encryptionKey); + unbufOut = encryptedStreams.out; + unbufIn = encryptedStreams.in; + } + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + in = new DataInputStream(unbufIn); if (LOG.isDebugEnabled()) { LOG.debug("write to " + datanodes[j] + ": " Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Aug 7 16:46:03 2012 @@ -367,4 +367,9 @@ public class DFSConfigKeys extends Commo public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false; public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port"; public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019; + + // Security-related configs + public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer"; + public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false; + public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm"; } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Aug 7 16:46:03 2012 @@ -37,11 +37,14 @@ import org.apache.hadoop.fs.ChecksumExce import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; @@ -425,6 +428,7 @@ public class DFSInputStream extends FSIn // DatanodeInfo chosenNode = null; int refetchToken = 1; // only need to get a new access token once + int refetchEncryptionKey = 1; // only need to get a new encryption key once boolean connectFailedOnce = false; @@ -452,7 +456,14 @@ public class DFSInputStream extends FSIn } return chosenNode; } catch (IOException ex) { - if (ex instanceof InvalidBlockTokenException && refetchToken > 0) { + if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + targetAddr + + " : " + ex); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + } else if (ex instanceof InvalidBlockTokenException && refetchToken > 0) { DFSClient.LOG.info("Will fetch a new access token and retry, " + "access token was invalid when connecting to " + targetAddr + " : " + ex); @@ -754,6 +765,7 @@ public class DFSInputStream extends FSIn // Connect to best DataNode for desired Block, with potential offset // int refetchToken = 1; // only need to get a new access token once + int refetchEncryptionKey = 1; // only need to get a new encryption key once while (true) { // cached block locations may have been updated by chooseDataNode() @@ -789,7 +801,14 @@ public class DFSInputStream extends FSIn dfsClient.disableShortCircuit(); continue; } catch (IOException e) { - if (e instanceof InvalidBlockTokenException && refetchToken > 0) { + if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + targetAddr + + " : " + e); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + } else if (e instanceof InvalidBlockTokenException && refetchToken > 0) { DFSClient.LOG.info("Will get a new access token and retry, " + "access token was invalid when connecting to " + targetAddr + " : " + e); @@ -818,8 +837,9 @@ public class DFSInputStream extends FSIn */ private void closeBlockReader(BlockReader reader) throws IOException { if (reader.hasSentStatusCode()) { + IOStreamPair ioStreams = reader.getStreams(); Socket oldSock = reader.takeSocket(); - socketCache.put(oldSock); + socketCache.put(oldSock, ioStreams); } reader.close(); } @@ -864,14 +884,15 @@ public class DFSInputStream extends FSIn // Allow retry since there is no way of knowing whether the cached socket // is good until we actually use it. for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) { - Socket sock = null; + SocketAndStreams sockAndStreams = null; // Don't use the cache on the last attempt - it's possible that there // are arbitrarily many unusable sockets in the cache, but we don't // want to fail the read. if (retries < nCachedConnRetry) { - sock = socketCache.get(dnAddr); + sockAndStreams = socketCache.get(dnAddr); } - if (sock == null) { + Socket sock; + if (sockAndStreams == null) { fromCache = false; sock = dfsClient.socketFactory.createSocket(); @@ -895,6 +916,8 @@ public class DFSInputStream extends FSIn dfsClient.getRandomLocalInterfaceAddr(), dfsClient.getConf().socketTimeout); sock.setSoTimeout(dfsClient.getConf().socketTimeout); + } else { + sock = sockAndStreams.sock; } try { @@ -905,12 +928,18 @@ public class DFSInputStream extends FSIn blockToken, startOffset, len, bufferSize, verifyChecksum, - clientName); + clientName, + dfsClient.getDataEncryptionKey(), + sockAndStreams == null ? null : sockAndStreams.ioStreams); return reader; } catch (IOException ex) { // Our socket is no good. DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex); - sock.close(); + if (sockAndStreams != null) { + sockAndStreams.close(); + } else { + sock.close(); + } err = ex; } } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Aug 7 16:46:03 2012 @@ -24,7 +24,9 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.BufferOverflowException; @@ -56,6 +58,9 @@ import org.apache.hadoop.hdfs.protocol.N import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -867,16 +872,26 @@ public class DFSOutputStream extends FSO try { sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); - out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(sock, writeTimeout), + + OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(sock); + if (dfsClient.shouldEncryptData()) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + unbufOut, unbufIn, dfsClient.getDataEncryptionKey()); + unbufOut = encryptedStreams.out; + unbufIn = encryptedStreams.in; + } + out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); + in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, targets); + out.flush(); //ack - in = new DataInputStream(NetUtils.getInputStream(sock)); BlockOpResponseProto response = BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { @@ -1034,77 +1049,98 @@ public class DFSOutputStream extends FSO // persist blocks on namenode on next flush persistBlocks.set(true); - boolean result = false; - DataOutputStream out = null; - try { - assert null == s : "Previous socket unclosed"; - s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); - long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); - - // - // Xmit header info to datanode - // - out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(s, writeTimeout), - HdfsConstants.SMALL_BUFFER_SIZE)); - - assert null == blockReplyStream : "Previous blockReplyStream unclosed"; - blockReplyStream = new DataInputStream(NetUtils.getInputStream(s)); - - // send the request - new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, - nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, - nodes.length, block.getNumBytes(), bytesSent, newGS, checksum); - - // receive ack for connect - BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - HdfsProtoUtil.vintPrefixed(blockReplyStream)); - pipelineStatus = resp.getStatus(); - firstBadLink = resp.getFirstBadLink(); - - if (pipelineStatus != SUCCESS) { - if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) { - throw new InvalidBlockTokenException( - "Got access token error for connect ack with firstBadLink as " - + firstBadLink); - } else { - throw new IOException("Bad connect ack with firstBadLink as " - + firstBadLink); + int refetchEncryptionKey = 1; + while (true) { + boolean result = false; + DataOutputStream out = null; + try { + assert null == s : "Previous socket unclosed"; + assert null == blockReplyStream : "Previous blockReplyStream unclosed"; + s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); + long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); + + OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(s); + if (dfsClient.shouldEncryptData()) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams(unbufOut, + unbufIn, dfsClient.getDataEncryptionKey()); + unbufOut = encryptedStreams.out; + unbufIn = encryptedStreams.in; + } + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + blockReplyStream = new DataInputStream(unbufIn); + + // + // Xmit header info to datanode + // + + // send the request + new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, + nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, + nodes.length, block.getNumBytes(), bytesSent, newGS, checksum); + + // receive ack for connect + BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( + HdfsProtoUtil.vintPrefixed(blockReplyStream)); + pipelineStatus = resp.getStatus(); + firstBadLink = resp.getFirstBadLink(); + + if (pipelineStatus != SUCCESS) { + if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) { + throw new InvalidBlockTokenException( + "Got access token error for connect ack with firstBadLink as " + + firstBadLink); + } else { + throw new IOException("Bad connect ack with firstBadLink as " + + firstBadLink); + } } - } - assert null == blockStream : "Previous blockStream unclosed"; - blockStream = out; - result = true; // success - - } catch (IOException ie) { - - DFSClient.LOG.info("Exception in createBlockOutputStream", ie); - - // find the datanode that matches - if (firstBadLink.length() != 0) { - for (int i = 0; i < nodes.length; i++) { - if (nodes[i].getXferAddr().equals(firstBadLink)) { - errorIndex = i; - break; + assert null == blockStream : "Previous blockStream unclosed"; + blockStream = out; + result = true; // success + + } catch (IOException ie) { + DFSClient.LOG.info("Exception in createBlockOutputStream", ie); + if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + + nodes[0].getXferAddr() + " : " + ie); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + // Don't close the socket/exclude this node just yet. Try again with + // a new encryption key. + continue; + } + + // find the datanode that matches + if (firstBadLink.length() != 0) { + for (int i = 0; i < nodes.length; i++) { + if (nodes[i].getXferAddr().equals(firstBadLink)) { + errorIndex = i; + break; + } } + } else { + errorIndex = 0; + } + hasError = true; + setLastException(ie); + result = false; // error + } finally { + if (!result) { + IOUtils.closeSocket(s); + s = null; + IOUtils.closeStream(out); + out = null; + IOUtils.closeStream(blockReplyStream); + blockReplyStream = null; } - } else { - errorIndex = 0; - } - hasError = true; - setLastException(ie); - result = false; // error - } finally { - if (!result) { - IOUtils.closeSocket(s); - s = null; - IOUtils.closeStream(out); - out = null; - IOUtils.closeStream(blockReplyStream); - blockReplyStream = null; } + return result; } - return result; } private LocatedBlock locateFollowingBlock(long start, Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Aug 7 16:46:03 2012 @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputCheck import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; @@ -458,7 +459,9 @@ public class RemoteBlockReader extends F void sendReadResult(Socket sock, Status statusCode) { assert !sentStatusCode : "already sent status code to " + sock; try { - RemoteBlockReader2.writeReadResult(sock, statusCode); + RemoteBlockReader2.writeReadResult( + NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT), + statusCode); sentStatusCode = true; } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. @@ -484,4 +487,11 @@ public class RemoteBlockReader extends F throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); } + @Override + public IOStreamPair getStreams() { + // This class doesn't support encryption, which is the only thing this + // method is used for. See HDFS-3637. + return null; + } + } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Aug 7 16:46:03 2012 @@ -23,6 +23,7 @@ import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; @@ -35,12 +36,15 @@ import org.apache.hadoop.classification. import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -83,7 +87,9 @@ public class RemoteBlockReader2 impleme static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class); - Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. + Socket dnSock; + // for now just sending the status code (e.g. checksumOk) after the read. + private IOStreamPair ioStreams; private final ReadableByteChannel in; private DataChecksum checksum; @@ -206,9 +212,9 @@ public class RemoteBlockReader2 impleme if (bytesNeededToFinish <= 0) { readTrailingEmptyPacket(); if (verifyChecksum) { - sendReadResult(dnSock, Status.CHECKSUM_OK); + sendReadResult(Status.CHECKSUM_OK); } else { - sendReadResult(dnSock, Status.SUCCESS); + sendReadResult(Status.SUCCESS); } } } @@ -292,9 +298,11 @@ public class RemoteBlockReader2 impleme protected RemoteBlockReader2(String file, String bpid, long blockId, ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) { + long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock, + IOStreamPair ioStreams) { // Path is used only for printing block and file information in debug this.dnSock = dnSock; + this.ioStreams = ioStreams; this.in = in; this.checksum = checksum; this.verifyChecksum = verifyChecksum; @@ -369,24 +377,23 @@ public class RemoteBlockReader2 impleme * closing our connection (which we will re-open), but won't affect * data correctness. */ - void sendReadResult(Socket sock, Status statusCode) { - assert !sentStatusCode : "already sent status code to " + sock; + void sendReadResult(Status statusCode) { + assert !sentStatusCode : "already sent status code to " + dnSock; try { - writeReadResult(sock, statusCode); + writeReadResult(ioStreams.out, statusCode); sentStatusCode = true; } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. LOG.info("Could not send read status (" + statusCode + ") to datanode " + - sock.getInetAddress() + ": " + e.getMessage()); + dnSock.getInetAddress() + ": " + e.getMessage()); } } /** * Serialize the actual read result on the wire. */ - static void writeReadResult(Socket sock, Status statusCode) + static void writeReadResult(OutputStream out, Status statusCode) throws IOException { - OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT); ClientReadStatusProto.newBuilder() .setStatus(statusCode) @@ -434,25 +441,32 @@ public class RemoteBlockReader2 impleme * @param clientName Client name * @return New BlockReader instance, or null on error. */ - public static BlockReader newBlockReader( Socket sock, String file, + public static BlockReader newBlockReader(Socket sock, String file, ExtendedBlock block, Token blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, - String clientName) + String clientName, + DataEncryptionKey encryptionKey, + IOStreamPair ioStreams) throws IOException { + + ReadableByteChannel ch; + if (ioStreams.in instanceof SocketInputWrapper) { + ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel(); + } else { + ch = (ReadableByteChannel) ioStreams.in; + } + // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(sock, - HdfsServerConstants.WRITE_TIMEOUT))); + ioStreams.out)); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); // - // Get bytes in block, set streams + // Get bytes in block // - SocketInputWrapper sin = NetUtils.getInputStream(sock); - ReadableByteChannel ch = sin.getReadableByteChannel(); - DataInputStream in = new DataInputStream(sin); + DataInputStream in = new DataInputStream(ioStreams.in); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( vintPrefixed(in)); @@ -474,7 +488,8 @@ public class RemoteBlockReader2 impleme } return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), - ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); + ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock, + ioStreams); } static void checkSuccess( @@ -498,4 +513,9 @@ public class RemoteBlockReader2 impleme } } } + + @Override + public IOStreamPair getStreams() { + return ioStreams; + } } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Tue Aug 7 16:46:03 2012 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; +import java.io.Closeable; import java.net.Socket; import java.net.SocketAddress; @@ -29,6 +30,8 @@ import com.google.common.base.Preconditi import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.io.IOUtils; /** @@ -37,7 +40,7 @@ import org.apache.hadoop.io.IOUtils; class SocketCache { static final Log LOG = LogFactory.getLog(SocketCache.class); - private final LinkedListMultimap multimap; + private final LinkedListMultimap multimap; private final int capacity; /** @@ -57,21 +60,21 @@ class SocketCache { * @param remote Remote address the socket is connected to. * @return A socket with unknown state, possibly closed underneath. Or null. */ - public synchronized Socket get(SocketAddress remote) { + public synchronized SocketAndStreams get(SocketAddress remote) { if (capacity <= 0) { // disabled return null; } - List socklist = multimap.get(remote); + List socklist = multimap.get(remote); if (socklist == null) { return null; } - Iterator iter = socklist.iterator(); + Iterator iter = socklist.iterator(); while (iter.hasNext()) { - Socket candidate = iter.next(); + SocketAndStreams candidate = iter.next(); iter.remove(); - if (!candidate.isClosed()) { + if (!candidate.sock.isClosed()) { return candidate; } } @@ -82,10 +85,11 @@ class SocketCache { * Give an unused socket to the cache. * @param sock socket not used by anyone. */ - public synchronized void put(Socket sock) { + public synchronized void put(Socket sock, IOStreamPair ioStreams) { + SocketAndStreams s = new SocketAndStreams(sock, ioStreams); if (capacity <= 0) { // Cache disabled. - IOUtils.closeSocket(sock); + s.close(); return; } @@ -102,7 +106,7 @@ class SocketCache { if (capacity == multimap.size()) { evictOldest(); } - multimap.put(remoteAddr, sock); + multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams)); } public synchronized int size() { @@ -113,23 +117,23 @@ class SocketCache { * Evict the oldest entry in the cache. */ private synchronized void evictOldest() { - Iterator> iter = + Iterator> iter = multimap.entries().iterator(); if (!iter.hasNext()) { throw new IllegalStateException("Cannot evict from empty cache!"); } - Entry entry = iter.next(); + Entry entry = iter.next(); iter.remove(); - Socket sock = entry.getValue(); - IOUtils.closeSocket(sock); + SocketAndStreams s = entry.getValue(); + s.close(); } /** * Empty the cache, and close all sockets. */ public synchronized void clear() { - for (Socket sock : multimap.values()) { - IOUtils.closeSocket(sock); + for (SocketAndStreams s : multimap.values()) { + s.close(); } multimap.clear(); } @@ -138,5 +142,25 @@ class SocketCache { protected void finalize() { clear(); } + + @InterfaceAudience.Private + static class SocketAndStreams implements Closeable { + public final Socket sock; + public final IOStreamPair ioStreams; + + public SocketAndStreams(Socket s, IOStreamPair ioStreams) { + this.sock = s; + this.ioStreams = ioStreams; + } + + @Override + public void close() { + if (ioStreams != null) { + IOUtils.closeStream(ioStreams.in); + IOUtils.closeStream(ioStreams.out); + } + IOUtils.closeSocket(sock); + } + } } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Aug 7 16:46:03 2012 @@ -44,6 +44,7 @@ import org.apache.hadoop.security.Access import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; @@ -941,4 +942,11 @@ public interface ClientProtocol { */ public void cancelDelegationToken(Token token) throws IOException; + + /** + * @return encryption key so a client can encrypt data sent via the + * DataTransferProtocol to/from DataNodes. + * @throws IOException + */ + public DataEncryptionKey getDataEncryptionKey() throws IOException; } Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java?rev=1370360&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java Tue Aug 7 16:46:03 2012 @@ -0,0 +1,505 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import java.util.TreeMap; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.security.SaslInputStream; +import org.apache.hadoop.security.SaslOutputStream; + +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; + +/** + * A class which, given connected input/output streams, will perform a + * handshake using those streams based on SASL to produce new Input/Output + * streams which will encrypt/decrypt all data written/read from said streams. + * Much of this is inspired by or borrowed from the TSaslTransport in Apache + * Thrift, but with some HDFS-specific tweaks. + */ +@InterfaceAudience.Private +public class DataTransferEncryptor { + + public static final Log LOG = LogFactory.getLog(DataTransferEncryptor.class); + + /** + * Sent by clients and validated by servers. We use a number that's unlikely + * to ever be sent as the value of the DATA_TRANSFER_VERSION. + */ + private static final int ENCRYPTED_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; + + /** + * Delimiter for the three-part SASL username string. + */ + private static final String NAME_DELIMITER = " "; + + // This has to be set as part of the SASL spec, but it don't matter for + // our purposes, but may not be empty. It's sent over the wire, so use + // a short string. + private static final String SERVER_NAME = "0"; + + private static final String PROTOCOL = "hdfs"; + private static final String MECHANISM = "DIGEST-MD5"; + private static final Map SASL_PROPS = new TreeMap(); + + static { + SASL_PROPS.put(Sasl.QOP, "auth-conf"); + SASL_PROPS.put(Sasl.SERVER_AUTH, "true"); + } + + /** + * Factory method for DNs, where the nonce, keyId, and encryption key are not + * yet known. The nonce and keyId will be sent by the client, and the DN + * will then use those pieces of info and the secret key shared with the NN + * to determine the encryptionKey used for the SASL handshake/encryption. + * + * Establishes a secure connection assuming that the party on the other end + * has the same shared secret. This does a SASL connection handshake, but not + * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with + * auth-conf enabled. In particular, it doesn't support an arbitrary number of + * challenge/response rounds, and we know that the client will never have an + * initial response, so we don't check for one. + * + * @param underlyingOut output stream to write to the other party + * @param underlyingIn input stream to read from the other party + * @param blockPoolTokenSecretManager secret manager capable of constructing + * encryption key based on keyId, blockPoolId, and nonce + * @return a pair of streams which wrap the given streams and encrypt/decrypt + * all data read/written + * @throws IOException in the event of error + */ + public static IOStreamPair getEncryptedStreams( + OutputStream underlyingOut, InputStream underlyingIn, + BlockPoolTokenSecretManager blockPoolTokenSecretManager, + String encryptionAlgorithm) throws IOException { + + DataInputStream in = new DataInputStream(underlyingIn); + DataOutputStream out = new DataOutputStream(underlyingOut); + + Map saslProps = Maps.newHashMap(SASL_PROPS); + saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); + + if (LOG.isDebugEnabled()) { + LOG.debug("Server using encryption algorithm " + encryptionAlgorithm); + } + + SaslParticipant sasl = new SaslParticipant(Sasl.createSaslServer(MECHANISM, + PROTOCOL, SERVER_NAME, saslProps, + new SaslServerCallbackHandler(blockPoolTokenSecretManager))); + + int magicNumber = in.readInt(); + if (magicNumber != ENCRYPTED_TRANSFER_MAGIC_NUMBER) { + throw new InvalidMagicNumberException(magicNumber); + } + try { + // step 1 + performSaslStep1(out, in, sasl); + + // step 2 (server-side only) + byte[] remoteResponse = readSaslMessage(in); + byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + sendSaslMessage(out, localResponse); + + // SASL handshake is complete + checkSaslComplete(sasl); + + return sasl.createEncryptedStreamPair(out, in); + } catch (IOException ioe) { + if (ioe instanceof SaslException && + ioe.getCause() != null && + ioe.getCause() instanceof InvalidEncryptionKeyException) { + // This could just be because the client is long-lived and hasn't gotten + // a new encryption key from the NN in a while. Upon receiving this + // error, the client will get a new encryption key from the NN and retry + // connecting to this DN. + sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage()); + } else { + sendGenericSaslErrorMessage(out, ioe.getMessage()); + } + throw ioe; + } + } + + /** + * Factory method for clients, where the encryption token is already created. + * + * Establishes a secure connection assuming that the party on the other end + * has the same shared secret. This does a SASL connection handshake, but not + * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with + * auth-conf enabled. In particular, it doesn't support an arbitrary number of + * challenge/response rounds, and we know that the client will never have an + * initial response, so we don't check for one. + * + * @param underlyingOut output stream to write to the other party + * @param underlyingIn input stream to read from the other party + * @param encryptionKey all info required to establish an encrypted stream + * @return a pair of streams which wrap the given streams and encrypt/decrypt + * all data read/written + * @throws IOException in the event of error + */ + public static IOStreamPair getEncryptedStreams( + OutputStream underlyingOut, InputStream underlyingIn, + DataEncryptionKey encryptionKey) + throws IOException { + + Map saslProps = Maps.newHashMap(SASL_PROPS); + saslProps.put("com.sun.security.sasl.digest.cipher", + encryptionKey.encryptionAlgorithm); + + if (LOG.isDebugEnabled()) { + LOG.debug("Client using encryption algorithm " + + encryptionKey.encryptionAlgorithm); + } + + DataOutputStream out = new DataOutputStream(underlyingOut); + DataInputStream in = new DataInputStream(underlyingIn); + + String userName = getUserNameFromEncryptionKey(encryptionKey); + SaslParticipant sasl = new SaslParticipant(Sasl.createSaslClient( + new String[] { MECHANISM }, userName, PROTOCOL, SERVER_NAME, saslProps, + new SaslClientCallbackHandler(encryptionKey.encryptionKey, userName))); + + out.writeInt(ENCRYPTED_TRANSFER_MAGIC_NUMBER); + out.flush(); + + try { + // Start of handshake - "initial response" in SASL terminology. + sendSaslMessage(out, new byte[0]); + + // step 1 + performSaslStep1(out, in, sasl); + + // step 2 (client-side only) + byte[] remoteResponse = readSaslMessage(in); + byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + assert localResponse == null; + + // SASL handshake is complete + checkSaslComplete(sasl); + + return sasl.createEncryptedStreamPair(out, in); + } catch (IOException ioe) { + sendGenericSaslErrorMessage(out, ioe.getMessage()); + throw ioe; + } + } + + private static void performSaslStep1(DataOutputStream out, DataInputStream in, + SaslParticipant sasl) throws IOException { + byte[] remoteResponse = readSaslMessage(in); + byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); + sendSaslMessage(out, localResponse); + } + + private static void checkSaslComplete(SaslParticipant sasl) throws IOException { + if (!sasl.isComplete()) { + throw new IOException("Failed to complete SASL handshake"); + } + + if (!sasl.supportsConfidentiality()) { + throw new IOException("SASL handshake completed, but channel does not " + + "support encryption"); + } + } + + private static void sendSaslMessage(DataOutputStream out, byte[] payload) + throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null); + } + + private static void sendInvalidKeySaslErrorMessage(DataOutputStream out, + String message) throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null, + message); + } + + private static void sendGenericSaslErrorMessage(DataOutputStream out, + String message) throws IOException { + sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message); + } + + private static void sendSaslMessage(OutputStream out, + DataTransferEncryptorStatus status, byte[] payload, String message) + throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); + + builder.setStatus(status); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (message != null) { + builder.setMessage(message); + } + + DataTransferEncryptorMessageProto proto = builder.build(); + proto.writeDelimitedTo(out); + out.flush(); + } + + private static byte[] readSaslMessage(DataInputStream in) throws IOException { + DataTransferEncryptorMessageProto proto = + DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } else { + return proto.getPayload().toByteArray(); + } + } + + /** + * Set the encryption key when asked by the server-side SASL object. + */ + private static class SaslServerCallbackHandler implements CallbackHandler { + + private BlockPoolTokenSecretManager blockPoolTokenSecretManager; + + public SaslServerCallbackHandler(BlockPoolTokenSecretManager + blockPoolTokenSecretManager) { + this.blockPoolTokenSecretManager = blockPoolTokenSecretManager; + } + + @Override + public void handle(Callback[] callbacks) throws IOException, + UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + AuthorizeCallback ac = null; + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof RealmCallback) { + continue; // realm is ignored + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL DIGEST-MD5 Callback: " + callback); + } + } + + if (pc != null) { + byte[] encryptionKey = getEncryptionKeyFromUserName( + blockPoolTokenSecretManager, nc.getDefaultName()); + pc.setPassword(encryptionKeyToPassword(encryptionKey)); + } + + if (ac != null) { + ac.setAuthorized(true); + ac.setAuthorizedID(ac.getAuthorizationID()); + } + + } + + } + + /** + * Set the encryption key when asked by the client-side SASL object. + */ + private static class SaslClientCallbackHandler implements CallbackHandler { + + private byte[] encryptionKey; + private String userName; + + public SaslClientCallbackHandler(byte[] encryptionKey, String userName) { + this.encryptionKey = encryptionKey; + this.userName = userName; + } + + @Override + public void handle(Callback[] callbacks) throws IOException, + UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof RealmChoiceCallback) { + continue; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL client callback"); + } + } + if (nc != null) { + nc.setName(userName); + } + if (pc != null) { + pc.setPassword(encryptionKeyToPassword(encryptionKey)); + } + if (rc != null) { + rc.setText(rc.getDefaultText()); + } + } + + } + + /** + * The SASL username consists of the keyId, blockPoolId, and nonce with the + * first two encoded as Strings, and the third encoded using Base64. The + * fields are each separated by a single space. + * + * @param encryptionKey the encryption key to encode as a SASL username. + * @return encoded username containing keyId, blockPoolId, and nonce + */ + private static String getUserNameFromEncryptionKey( + DataEncryptionKey encryptionKey) { + return encryptionKey.keyId + NAME_DELIMITER + + encryptionKey.blockPoolId + NAME_DELIMITER + + new String(Base64.encodeBase64(encryptionKey.nonce, false)); + } + + /** + * Given a secret manager and a username encoded as described above, determine + * the encryption key. + * + * @param blockPoolTokenSecretManager to determine the encryption key. + * @param userName containing the keyId, blockPoolId, and nonce. + * @return secret encryption key. + * @throws IOException + */ + private static byte[] getEncryptionKeyFromUserName( + BlockPoolTokenSecretManager blockPoolTokenSecretManager, String userName) + throws IOException { + String[] nameComponents = userName.split(NAME_DELIMITER); + if (nameComponents.length != 3) { + throw new IOException("Provided name '" + userName + "' has " + + nameComponents.length + " components instead of the expected 3."); + } + int keyId = Integer.parseInt(nameComponents[0]); + String blockPoolId = nameComponents[1]; + byte[] nonce = Base64.decodeBase64(nameComponents[2]); + return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId, + blockPoolId, nonce); + } + + private static char[] encryptionKeyToPassword(byte[] encryptionKey) { + return new String(Base64.encodeBase64(encryptionKey, false)).toCharArray(); + } + + /** + * Strongly inspired by Thrift's TSaslTransport class. + * + * Used to abstract over the SaslServer and + * SaslClient classes, which share a lot of their interface, but + * unfortunately don't share a common superclass. + */ + private static class SaslParticipant { + // One of these will always be null. + public SaslServer saslServer; + public SaslClient saslClient; + + public SaslParticipant(SaslServer saslServer) { + this.saslServer = saslServer; + } + + public SaslParticipant(SaslClient saslClient) { + this.saslClient = saslClient; + } + + public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException { + if (saslClient != null) { + return saslClient.evaluateChallenge(challengeOrResponse); + } else { + return saslServer.evaluateResponse(challengeOrResponse); + } + } + + public boolean isComplete() { + if (saslClient != null) + return saslClient.isComplete(); + else + return saslServer.isComplete(); + } + + public boolean supportsConfidentiality() { + String qop = null; + if (saslClient != null) { + qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + } else { + qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + } + return qop != null && qop.equals("auth-conf"); + } + + // Return some input/output streams that will henceforth have their + // communication encrypted. + private IOStreamPair createEncryptedStreamPair( + DataOutputStream out, DataInputStream in) { + if (saslClient != null) { + return new IOStreamPair( + new SaslInputStream(in, saslClient), + new SaslOutputStream(out, saslClient)); + } else { + return new IOStreamPair( + new SaslInputStream(in, saslServer), + new SaslOutputStream(out, saslServer)); + } + } + } + + @InterfaceAudience.Private + public static class InvalidMagicNumberException extends IOException { + + private static final long serialVersionUID = 1L; + + public InvalidMagicNumberException(int magicNumber) { + super(String.format("Received %x instead of %x from client.", + magicNumber, ENCRYPTED_TRANSFER_MAGIC_NUMBER)); + } + } + +} Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java?rev=1370360&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java Tue Aug 7 16:46:03 2012 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A little struct class to wrap an InputStream and an OutputStream. + */ +@InterfaceAudience.Private +public class IOStreamPair { + public final InputStream in; + public final OutputStream out; + + public IOStreamPair(InputStream in, OutputStream out) { + this.in = in; + this.out = out; + } +} \ No newline at end of file Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java?rev=1370360&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java Tue Aug 7 16:46:03 2012 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Encryption key verification failed. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class InvalidEncryptionKeyException extends IOException { + private static final long serialVersionUID = 0l; + + public InvalidEncryptionKeyException() { + super(); + } + + public InvalidEncryptionKeyException(String msg) { + super(msg); + } +} Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Tue Aug 7 16:46:03 2012 @@ -38,10 +38,10 @@ import org.apache.hadoop.hdfs.protocol.p @InterfaceAudience.Private @InterfaceStability.Evolving public abstract class Receiver implements DataTransferProtocol { - protected final DataInputStream in; - - /** Create a receiver for DataTransferProtocol with a socket. */ - protected Receiver(final DataInputStream in) { + protected DataInputStream in; + + /** Initialize a receiver for DataTransferProtocol with a socket. */ + protected void initialize(final DataInputStream in) { this.in = in; } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Tue Aug 7 16:46:03 2012 @@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder; @@ -127,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.io.Text; @@ -830,4 +833,18 @@ public class ClientNamenodeProtocolServe throw new ServiceException(e); } } + + @Override + public GetDataEncryptionKeyResponseProto getDataEncryptionKey( + RpcController controller, GetDataEncryptionKeyRequestProto request) + throws ServiceException { + try { + DataEncryptionKey encryptionKey = server.getDataEncryptionKey(); + return GetDataEncryptionKeyResponseProto.newBuilder() + .setDataEncryptionKey(PBHelper.convert(encryptionKey)) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1370360&r1=1370359&r2=1370360&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Tue Aug 7 16:46:03 2012 @@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; @@ -99,6 +100,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; @@ -815,9 +817,22 @@ public class ClientNamenodeProtocolTrans ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName); } + + @Override + public DataEncryptionKey getDataEncryptionKey() throws IOException { + GetDataEncryptionKeyRequestProto req = GetDataEncryptionKeyRequestProto + .newBuilder().build(); + try { + return PBHelper.convert(rpcProxy.getDataEncryptionKey(null, req) + .getDataEncryptionKey()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } @Override public Object getUnderlyingProxyObject() { return rpcProxy; } + }