From common-commits-return-96352-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Wed Sep 4 00:20:11 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id C6CD9180674 for ; Wed, 4 Sep 2019 02:20:10 +0200 (CEST) Received: (qmail 90371 invoked by uid 500); 4 Sep 2019 03:42:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 90143 invoked by uid 99); 4 Sep 2019 03:42:04 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Sep 2019 03:42:04 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id DFE438060C; Wed, 4 Sep 2019 00:20:04 +0000 (UTC) Date: Wed, 04 Sep 2019 00:20:06 +0000 To: "common-commits@hadoop.apache.org" Subject: [hadoop] 04/05: HDFS-13699. Add DFSClient sending handshake token to DataNode, and allow DataNode overwrite downstream QOP. Contributed by Chen Liang. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: cliang@apache.org In-Reply-To: <156755640264.12418.13532799405464652936@gitbox.apache.org> References: <156755640264.12418.13532799405464652936@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: hadoop X-Git-Refname: refs/heads/branch-2 X-Git-Reftype: branch X-Git-Rev: 163cb4d5aebd57c8d047903b218924a34a5a9165 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190904002004.DFE438060C@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. cliang pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hadoop.git commit 163cb4d5aebd57c8d047903b218924a34a5a9165 Author: Chen Liang AuthorDate: Fri Apr 12 17:37:51 2019 -0700 HDFS-13699. Add DFSClient sending handshake token to DataNode, and allow DataNode overwrite downstream QOP. Contributed by Chen Liang. --- .../hadoop/security/token/SecretManager.java | 2 +- .../hadoop/hdfs/client/HdfsClientConfigKeys.java | 3 + .../datatransfer/sasl/DataTransferSaslUtil.java | 72 +++++++ .../datatransfer/sasl/SaslDataTransferClient.java | 117 +++++++++-- .../src/main/proto/datatransfer.proto | 6 + .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../datatransfer/sasl/SaslDataTransferServer.java | 53 ++++- .../apache/hadoop/hdfs/server/datanode/DNConf.java | 6 + .../hadoop/hdfs/server/datanode/DataNode.java | 10 + .../hadoop/hdfs/server/datanode/DataXceiver.java | 16 +- .../src/main/resources/hdfs-default.xml | 22 +++ .../apache/hadoop/hdfs/TestHAAuxiliaryPort.java | 2 +- .../apache/hadoop/hdfs/TestMultipleNNPortQOP.java | 219 +++++++++++++++++++++ 13 files changed, 508 insertions(+), 24 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java index 798c8c9..514806d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java @@ -167,7 +167,7 @@ public abstract class SecretManager { * @param key the secret key * @return the bytes of the generated password */ - protected static byte[] createPassword(byte[] identifier, + public static byte[] createPassword(byte[] identifier, SecretKey key) { Mac mac = threadLocalMac.get(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index c74ece2..44662d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -171,6 +171,9 @@ public interface HdfsClientConfigKeys { String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites"; + String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY = + "dfs.encrypt.data.overwrite.downstream.new.qop"; + String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection"; String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = ""; String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java index f4651eb..666a29f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.HandshakeSecretProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.security.SaslPropertiesResolver; @@ -249,6 +250,51 @@ public final class DataTransferSaslUtil { } } + static class SaslMessageWithHandshake { + private final byte[] payload; + private final byte[] secret; + private final String bpid; + + SaslMessageWithHandshake(byte[] payload, byte[] secret, String bpid) { + this.payload = payload; + this.secret = secret; + this.bpid = bpid; + } + + byte[] getPayload() { + return payload; + } + + byte[] getSecret() { + return secret; + } + + String getBpid() { + return bpid; + } + } + + public static SaslMessageWithHandshake readSaslMessageWithHandshakeSecret( + InputStream in) throws IOException { + DataTransferEncryptorMessageProto proto = + DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } else { + byte[] payload = proto.getPayload().toByteArray(); + byte[] secret = null; + String bpid = null; + if (proto.hasHandshakeSecret()) { + HandshakeSecretProto handshakeSecret = proto.getHandshakeSecret(); + secret = handshakeSecret.getSecret().toByteArray(); + bpid = handshakeSecret.getBpid(); + } + return new SaslMessageWithHandshake(payload, secret, bpid); + } + } + /** * Negotiate a cipher option which server supports. * @@ -375,6 +421,12 @@ public final class DataTransferSaslUtil { sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null); } + public static void sendSaslMessageHandshakeSecret(OutputStream out, + byte[] payload, byte[] secret, String bpid) throws IOException { + sendSaslMessageHandshakeSecret(out, DataTransferEncryptorStatus.SUCCESS, + payload, null, secret, bpid); + } + /** * Send a SASL negotiation message and negotiation cipher options to server. * @@ -497,6 +549,13 @@ public final class DataTransferSaslUtil { public static void sendSaslMessage(OutputStream out, DataTransferEncryptorStatus status, byte[] payload, String message) throws IOException { + sendSaslMessage(out, status, payload, message, null); + } + + public static void sendSaslMessage(OutputStream out, + DataTransferEncryptorStatus status, byte[] payload, String message, + HandshakeSecretProto handshakeSecret) + throws IOException { DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto.newBuilder(); @@ -507,12 +566,25 @@ public final class DataTransferSaslUtil { if (message != null) { builder.setMessage(message); } + if (handshakeSecret != null) { + builder.setHandshakeSecret(handshakeSecret); + } DataTransferEncryptorMessageProto proto = builder.build(); proto.writeDelimitedTo(out); out.flush(); } + public static void sendSaslMessageHandshakeSecret(OutputStream out, + DataTransferEncryptorStatus status, byte[] payload, String message, + byte[] secret, String bpid) throws IOException { + HandshakeSecretProto.Builder builder = + HandshakeSecretProto.newBuilder(); + builder.setSecret(ByteString.copyFrom(secret)); + builder.setBpid(bpid); + sendSaslMessage(out, status, payload, message, builder.build()); + } + /** * There is no reason to instantiate this class. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java index 7804bec..8d1c7f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java @@ -18,8 +18,11 @@ package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY; import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*; +import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -31,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import javax.crypto.SecretKey; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; @@ -39,6 +43,7 @@ import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.RealmCallback; import javax.security.sasl.RealmChoiceCallback; +import javax.security.sasl.Sasl; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -54,6 +59,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.security.SaslPropertiesResolver; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +89,10 @@ public class SaslDataTransferClient { private final SaslPropertiesResolver saslPropsResolver; private final TrustedChannelResolver trustedChannelResolver; + // Store the most recent successfully negotiated QOP, + // for testing purpose only + private String targetQOP; + /** * Creates a new SaslDataTransferClient. This constructor is used in cases * where it is not relevant to track if a secure client did a fallback to @@ -140,7 +150,7 @@ public class SaslDataTransferClient { DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ? encryptionKeyFactory.newDataEncryptionKey() : null; IOStreamPair ios = send(socket.getInetAddress(), underlyingOut, - underlyingIn, encryptionKey, accessToken, datanodeId); + underlyingIn, encryptionKey, accessToken, datanodeId, null); return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut); } @@ -180,8 +190,19 @@ public class SaslDataTransferClient { InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory, Token accessToken, DatanodeID datanodeId) throws IOException { + return socketSend(socket, underlyingOut, underlyingIn, encryptionKeyFactory, + accessToken, datanodeId, null); + } + + public IOStreamPair socketSend( + Socket socket, OutputStream underlyingOut, InputStream underlyingIn, + DataEncryptionKeyFactory encryptionKeyFactory, + Token accessToken, DatanodeID datanodeId, + SecretKey secretKey) + throws IOException { IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut, - underlyingIn, encryptionKeyFactory, accessToken, datanodeId); + underlyingIn, encryptionKeyFactory, accessToken, datanodeId, + secretKey); return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut); } @@ -203,13 +224,26 @@ public class SaslDataTransferClient { DataEncryptionKeyFactory encryptionKeyFactory, Token accessToken, DatanodeID datanodeId) throws IOException { - if (!trustedChannelResolver.isTrusted() && - !trustedChannelResolver.isTrusted(addr)) { + return checkTrustAndSend(addr, underlyingOut, underlyingIn, + encryptionKeyFactory, accessToken, datanodeId, null); + } + + private IOStreamPair checkTrustAndSend( + InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn, + DataEncryptionKeyFactory encryptionKeyFactory, + Token accessToken, DatanodeID datanodeId, + SecretKey secretKey) + throws IOException { + boolean localTrusted = trustedChannelResolver.isTrusted(); + boolean remoteTrusted = trustedChannelResolver.isTrusted(addr); + LOG.info("SASL encryption trust check: localHostTrusted = {}, " + + "remoteHostTrusted = {}", localTrusted, remoteTrusted); + if (!localTrusted || !remoteTrusted) { // The encryption key factory only returns a key if encryption is enabled. DataEncryptionKey encryptionKey = encryptionKeyFactory.newDataEncryptionKey(); return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken, - datanodeId); + datanodeId, secretKey); } else { LOG.debug( "SASL client skipping handshake on trusted connection for addr = {}, " @@ -233,13 +267,14 @@ public class SaslDataTransferClient { */ private IOStreamPair send(InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn, DataEncryptionKey encryptionKey, - Token accessToken, DatanodeID datanodeId) + Token accessToken, DatanodeID datanodeId, + SecretKey secretKey) throws IOException { if (encryptionKey != null) { LOG.debug("SASL client doing encrypted handshake for addr = {}, " + "datanodeId = {}", addr, datanodeId); return getEncryptedStreams(addr, underlyingOut, underlyingIn, - encryptionKey); + encryptionKey, accessToken, secretKey); } else if (!UserGroupInformation.isSecurityEnabled()) { LOG.debug("SASL client skipping handshake in unsecured configuration for " + "addr = {}, datanodeId = {}", addr, datanodeId); @@ -260,7 +295,8 @@ public class SaslDataTransferClient { LOG.debug( "SASL client doing general handshake for addr = {}, datanodeId = {}", addr, datanodeId); - return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken); + return getSaslStreams(addr, underlyingOut, underlyingIn, + accessToken, secretKey); } else { // It's a secured cluster using non-privileged ports, but no SASL. The // only way this can happen is if the DataNode has @@ -283,11 +319,20 @@ public class SaslDataTransferClient { * @throws IOException for any error */ private IOStreamPair getEncryptedStreams(InetAddress addr, - OutputStream underlyingOut, - InputStream underlyingIn, DataEncryptionKey encryptionKey) + OutputStream underlyingOut, InputStream underlyingIn, + DataEncryptionKey encryptionKey, + Token accessToken, + SecretKey secretKey) throws IOException { Map saslProps = createSaslPropertiesForEncryption( encryptionKey.encryptionAlgorithm); + if (secretKey != null) { + LOG.debug("DataNode overwriting downstream QOP" + + saslProps.get(Sasl.QOP)); + byte[] newSecret = SecretManager.createPassword(saslProps.get(Sasl.QOP) + .getBytes(Charsets.UTF_8), secretKey); + accessToken.setDNHandshakeSecret(newSecret); + } LOG.debug("Client using encryption algorithm {}", encryptionKey.encryptionAlgorithm); @@ -297,7 +342,7 @@ public class SaslDataTransferClient { CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, password); return doSaslHandshake(addr, underlyingOut, underlyingIn, userName, - saslProps, callbackHandler); + saslProps, callbackHandler, accessToken); } /** @@ -366,6 +411,11 @@ public class SaslDataTransferClient { } } + @VisibleForTesting + public String getTargetQOP() { + return targetQOP; + } + /** * Sends client SASL negotiation for general-purpose handshake. * @@ -378,16 +428,36 @@ public class SaslDataTransferClient { */ private IOStreamPair getSaslStreams(InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn, - Token accessToken) + Token accessToken, + SecretKey secretKey) throws IOException { Map saslProps = saslPropsResolver.getClientProperties(addr); + // secretKey != null only happens when this is called by DN + // sending to downstream DN. If called from client, this will be null, + // as there is no key for client to generate mac instance. + // So that, if a different QOP is desired for inter-DN communication, + // the check below will use new QOP to create a secret, which includes + // the new QOP. + if (secretKey != null) { + String newQOP = conf + .get(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY); + if (newQOP != null) { + saslProps.put(Sasl.QOP, newQOP); + } + LOG.debug("DataNode overwriting downstream QOP " + + saslProps.get(Sasl.QOP)); + byte[] newSecret = SecretManager.createPassword( + saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8), secretKey); + accessToken.setDNHandshakeSecret(newSecret); + } + targetQOP = saslProps.get(Sasl.QOP); String userName = buildUserName(accessToken); char[] password = buildClientPassword(accessToken); CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, password); return doSaslHandshake(addr, underlyingOut, underlyingIn, userName, - saslProps, callbackHandler); + saslProps, callbackHandler, accessToken); } /** @@ -431,8 +501,8 @@ public class SaslDataTransferClient { */ private IOStreamPair doSaslHandshake(InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn, String userName, - Map saslProps, - CallbackHandler callbackHandler) throws IOException { + Map saslProps, CallbackHandler callbackHandler, + Token accessToken) throws IOException { DataOutputStream out = new DataOutputStream(underlyingOut); DataInputStream in = new DataInputStream(underlyingIn); @@ -445,7 +515,22 @@ public class SaslDataTransferClient { try { // Start of handshake - "initial response" in SASL terminology. - sendSaslMessage(out, new byte[0]); + // The handshake secret can be null, this happens when client is running + // a new version but the cluster does not have this feature. In which case + // there will be no encrypted secret sent from NN. + byte[] handshakeSecret = accessToken.getDnHandshakeSecret(); + if (handshakeSecret == null || handshakeSecret.length == 0) { + LOG.debug("Handshake secret is null, sending without " + + "handshake secret."); + sendSaslMessage(out, new byte[0]); + } else { + LOG.debug("Sending handshake secret."); + BlockTokenIdentifier identifier = new BlockTokenIdentifier(); + identifier.readFields(new DataInputStream( + new ByteArrayInputStream(accessToken.getIdentifier()))); + String bpid = identifier.getBlockPoolId(); + sendSaslMessageHandshakeSecret(out, new byte[0], handshakeSecret, bpid); + } // step 1 byte[] remoteResponse = readSaslMessage(in); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index a091d41..135bab1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -43,6 +43,12 @@ message DataTransferEncryptorMessageProto { optional bytes payload = 2; optional string message = 3; repeated CipherOptionProto cipherOption = 4; + optional HandshakeSecretProto handshakeSecret = 5; +} + +message HandshakeSecretProto { + required bytes secret = 1; + required string bpid = 2; } message BaseHeaderProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index c3eec2b..23198ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -835,6 +835,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // Security-related configs public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer"; + public static final String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY = + "dfs.encrypt.data.overwrite.downstream.derived.qop"; + public static final boolean DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT = + false; public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false; public static final String DFS_XFRAME_OPTION_ENABLED = "dfs.xframe.enabled"; public static final boolean DFS_XFRAME_OPTION_ENABLED_DEFAULT = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java index e67d873..eb26e99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java @@ -21,15 +21,18 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*; +import com.google.common.annotations.VisibleForTesting; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Arrays; import java.util.List; import java.util.Map; +import javax.crypto.SecretKey; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; @@ -37,6 +40,7 @@ import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.RealmCallback; +import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import org.apache.commons.codec.binary.Base64; @@ -48,12 +52,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +85,10 @@ public class SaslDataTransferServer { private final BlockPoolTokenSecretManager blockPoolTokenSecretManager; private final DNConf dnConf; + // Store the most recent successfully negotiated QOP, + // for testing purpose only + private String negotiatedQOP; + /** * Creates a new SaslDataTransferServer. * @@ -337,6 +348,26 @@ public class SaslDataTransferServer { return identifier; } + private String examineSecret(byte[] secret, String bpid) { + BlockKey blockKey = blockPoolTokenSecretManager.get(bpid).getCurrentKey(); + SecretKey secretKey = blockKey.getKey(); + for (SaslRpcServer.QualityOfProtection qop : + SaslRpcServer.QualityOfProtection.values()) { + String qopString = qop.getSaslQop(); + byte[] data = qopString.getBytes(Charsets.UTF_8); + byte[] encryptedData = SecretManager.createPassword(data, secretKey); + if (Arrays.equals(encryptedData, secret)) { + return qopString; + } + } + return null; + } + + @VisibleForTesting + public String getNegotiatedQOP() { + return negotiatedQOP; + } + /** * This method actually executes the server-side SASL handshake. * @@ -355,9 +386,6 @@ public class SaslDataTransferServer { DataInputStream in = new DataInputStream(underlyingIn); DataOutputStream out = new DataOutputStream(underlyingOut); - SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps, - callbackHandler); - int magicNumber = in.readInt(); if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) { throw new InvalidMagicNumberException(magicNumber, @@ -365,7 +393,23 @@ public class SaslDataTransferServer { } try { // step 1 - byte[] remoteResponse = readSaslMessage(in); + SaslMessageWithHandshake message = readSaslMessageWithHandshakeSecret(in); + byte[] secret = message.getSecret(); + String bpid = message.getBpid(); + if (secret != null || bpid != null) { + // sanity check, if one is null, the other must also not be null + assert(secret != null && bpid != null); + String qop = examineSecret(secret, bpid); + if (qop != null) { + saslProps.put(Sasl.QOP, qop); + } else { + LOG.error("Unable to match secret to a QOP!"); + } + } + SaslParticipant sasl = SaslParticipant.createServerSaslParticipant( + saslProps, callbackHandler); + + byte[] remoteResponse = message.getPayload(); byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); sendSaslMessage(out, localResponse); @@ -379,6 +423,7 @@ public class SaslDataTransferServer { checkSaslComplete(sasl, saslProps); CipherOption cipherOption = null; + negotiatedQOP = sasl.getNegotiatedQop(); if (sasl.isNegotiatedQopPrivacy()) { // Negotiate a cipher option Configuration conf = dnConf.getConf(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 113cdf8..c4fc4c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -32,6 +32,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_P import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; @@ -88,6 +90,7 @@ public class DNConf { final boolean syncOnClose; final boolean encryptDataTransfer; final boolean connectToDnViaHostname; + final boolean overwriteDownstreamDerivedQOP; final long readaheadLength; final long heartBeatInterval; @@ -233,6 +236,9 @@ public class DNConf { this.encryptDataTransfer = getConf().getBoolean( DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); + this.overwriteDownstreamDerivedQOP = getConf().getBoolean( + DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY, + DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT); this.encryptionAlgorithm = getConf().get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConf()); this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 1fab84a..f70e49b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1472,6 +1472,11 @@ public class DataNode extends ReconfigurableBase return UUID.randomUUID().toString(); } + @VisibleForTesting + public SaslDataTransferClient getSaslClient() { + return saslClient; + } + /** * Verify that the DatanodeUuid has been initialized. If this is a new * datanode then we generate a new Datanode Uuid and persist it to disk. @@ -1691,6 +1696,11 @@ public class DataNode extends ReconfigurableBase return streamingAddr.getPort(); } + @VisibleForTesting + public SaslDataTransferServer getSaslServer() { + return saslServer; + } + /** * @return name useful for logging */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 1d8db52..43b9500 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -48,6 +48,9 @@ import java.util.Arrays; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import javax.crypto.SecretKey; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSUtilClient; @@ -72,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCirc import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; @@ -777,8 +781,16 @@ class DataXceiver extends Receiver implements Runnable { InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); DataEncryptionKeyFactory keyFactory = datanode.getDataEncryptionKeyFactoryForBlock(block); - IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock, - unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]); + SecretKey secretKey = null; + if (dnConf.overwriteDownstreamDerivedQOP) { + String bpid = block.getBlockPoolId(); + BlockKey blockKey = datanode.blockPoolTokenSecretManager + .get(bpid).getCurrentKey(); + secretKey = blockKey.getKey(); + } + IOStreamPair saslStreams = datanode.saslClient.socketSend( + mirrorSock, unbufMirrorOut, unbufMirrorIn, keyFactory, + blockToken, targets[0], secretKey); unbufMirrorOut = saslStreams.out; unbufMirrorIn = saslStreams.in; mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index fa4ba35..fae39ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4567,4 +4567,26 @@ will use exactly the same QOP NameNode and client has already agreed on. + + + dfs.encrypt.data.overwrite.downstream.derived.qop + false + + A boolean specifies whether DN should overwrite the downstream + QOP in a write pipeline. This is used in the case where client + talks to first DN with a QOP, but inter-DN communication needs to be + using a different QOP. If set to false, the default behaviour is that + inter-DN communication will use the same QOP as client-DN connection. + + + + + dfs.encrypt.data.overwrite.downstream.new.qop + + + When dfs.datanode.overwrite.downstream.derived.qop is set to true, + this configuration specifies the new QOP to be used to overwrite + inter-DN QOP. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java index 867fbac..45ccefa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java @@ -37,7 +37,7 @@ import static org.junit.Assert.assertTrue; */ public class TestHAAuxiliaryPort { @Test - public void testTest() throws Exception { + public void testHAAuxiliaryPort() throws Exception { Configuration conf = new Configuration(); conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0"); conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java new file mode 100644 index 0000000..ca84557 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.net.URI; +import java.util.ArrayList; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY; +import static org.junit.Assert.*; + + +/** + * This test tests access NameNode on different port with different + * configured QOP. + */ +public class TestMultipleNNPortQOP extends SaslDataTransferTestCase { + + private static final Path PATH1 = new Path("/file1"); + private static final Path PATH2 = new Path("/file2"); + private static final Path PATH3 = new Path("/file3"); + private static final int BLOCK_SIZE = 4096; + private static final int NUM_BLOCKS = 3; + + private static HdfsConfiguration clusterConf; + + @Before + public void setup() throws Exception { + clusterConf = createSecureConfig( + "authentication,integrity,privacy"); + clusterConf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, + "12000,12100,12200"); + // explicitly setting service rpc for datanode. This because + // DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port + // and service port at the same time, and if no setting for service + // rpc, it would return client port, in this case, it will be the + // auxiliary port for data node. Which is not what auxiliary is for. + // setting service rpc port to avoid this. + clusterConf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020"); + clusterConf.set( + CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, + "org.apache.hadoop.security.IngressPortBasedResolver"); + clusterConf.set("ingress.port.sasl.configured.ports", "12000,12100,12200"); + clusterConf.set("ingress.port.sasl.prop.12000", "authentication"); + clusterConf.set("ingress.port.sasl.prop.12100", "integrity"); + clusterConf.set("ingress.port.sasl.prop.12200", "privacy"); + clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true); + } + + /** + * Test accessing NameNode from three different ports. + * + * @throws Exception + */ + @Test + public void testMultipleNNPort() throws Exception { + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(clusterConf) + .numDataNodes(3).build(); + + cluster.waitActive(); + HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); + clientConf.unset( + CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS); + ArrayList dataNodes = cluster.getDataNodes(); + + URI currentURI = cluster.getURI(); + URI uriAuthPort = new URI(currentURI.getScheme() + + "://" + currentURI.getHost() + ":12000"); + URI uriIntegrityPort = new URI(currentURI.getScheme() + + "://" + currentURI.getHost() + ":12100"); + URI uriPrivacyPort = new URI(currentURI.getScheme() + + "://" + currentURI.getHost() + ":12200"); + + clientConf.set(HADOOP_RPC_PROTECTION, "privacy"); + FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf); + doTest(fsPrivacy, PATH1); + for (DataNode dn : dataNodes) { + SaslDataTransferServer saslServer = dn.getSaslServer(); + assertEquals("auth-conf", saslServer.getNegotiatedQOP()); + } + + clientConf.set(HADOOP_RPC_PROTECTION, "integrity"); + FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf); + doTest(fsIntegrity, PATH2); + for (DataNode dn : dataNodes) { + SaslDataTransferServer saslServer = dn.getSaslServer(); + assertEquals("auth-int", saslServer.getNegotiatedQOP()); + } + + clientConf.set(HADOOP_RPC_PROTECTION, "authentication"); + FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf); + doTest(fsAuth, PATH3); + for (DataNode dn : dataNodes) { + SaslDataTransferServer saslServer = dn.getSaslServer(); + assertEquals("auth", saslServer.getNegotiatedQOP()); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test accessing NameNode from three different ports, tests + * overwriting downstream DN in the pipeline. + * + * @throws Exception + */ + @Test + public void testMultipleNNPortOverwriteDownStream() throws Exception { + clusterConf.set(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY, "auth"); + clusterConf.setBoolean( + DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY, true); + MiniDFSCluster cluster = null; + try { + cluster = + new MiniDFSCluster.Builder(clusterConf).numDataNodes(3).build(); + cluster.waitActive(); + HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf); + clientConf.unset( + CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS); + ArrayList dataNodes = cluster.getDataNodes(); + + URI currentURI = cluster.getURI(); + URI uriAuthPort = + new URI(currentURI.getScheme() + "://" + + currentURI.getHost() + ":12000"); + URI uriIntegrityPort = + new URI(currentURI.getScheme() + "://" + + currentURI.getHost() + ":12100"); + URI uriPrivacyPort = + new URI(currentURI.getScheme() + "://" + + currentURI.getHost() + ":12200"); + + clientConf.set(HADOOP_RPC_PROTECTION, "privacy"); + FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf); + doTest(fsPrivacy, PATH1); + // add a wait so that data has reached not only first DN, + // but also the rest + Thread.sleep(100); + for (int i = 0; i < 2; i++) { + DataNode dn = dataNodes.get(i); + SaslDataTransferClient saslClient = dn.getSaslClient(); + assertEquals("auth", saslClient.getTargetQOP()); + } + + clientConf.set(HADOOP_RPC_PROTECTION, "integrity"); + FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf); + doTest(fsIntegrity, PATH2); + Thread.sleep(100); + for (int i = 0; i < 2; i++) { + DataNode dn = dataNodes.get(i); + SaslDataTransferClient saslClient = dn.getSaslClient(); + assertEquals("auth", saslClient.getTargetQOP()); + } + + clientConf.set(HADOOP_RPC_PROTECTION, "authentication"); + FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf); + doTest(fsAuth, PATH3); + Thread.sleep(100); + for (int i = 0; i < 3; i++) { + DataNode dn = dataNodes.get(i); + SaslDataTransferServer saslServer = dn.getSaslServer(); + assertEquals("auth", saslServer.getNegotiatedQOP()); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void doTest(FileSystem fs, Path path) throws Exception { + FileSystemTestHelper.createFile(fs, path, NUM_BLOCKS, BLOCK_SIZE); + assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE), + DFSTestUtil.readFile(fs, path).getBytes("UTF-8")); + BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0, + Long.MAX_VALUE); + assertNotNull(blockLocations); + assertEquals(NUM_BLOCKS, blockLocations.length); + for (BlockLocation blockLocation: blockLocations) { + assertNotNull(blockLocation.getHosts()); + assertEquals(3, blockLocation.getHosts().length); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org