hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject hadoop git commit: HDFS-11026. Convert BlockTokenIdentifier to use Protobuf. Contributed by Ewan Higgs.
Date Mon, 13 Feb 2017 19:30:52 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 646c6d650 -> 4ed33e9ca


HDFS-11026. Convert BlockTokenIdentifier to use Protobuf. Contributed by Ewan Higgs.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ed33e9c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ed33e9c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ed33e9c

Branch: refs/heads/trunk
Commit: 4ed33e9ca3d85568e3904753a3ef61a85f801838
Parents: 646c6d6
Author: Chris Douglas <cdouglas@apache.org>
Authored: Mon Feb 13 11:27:48 2017 -0800
Committer: Chris Douglas <cdouglas@apache.org>
Committed: Mon Feb 13 11:29:18 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  51 ++++
 .../token/block/BlockTokenIdentifier.java       |  89 +++++-
 .../src/main/proto/hdfs.proto                   |  33 +++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   2 +
 .../token/block/BlockTokenSecretManager.java    |  18 +-
 .../hadoop/hdfs/server/balancer/KeyManager.java |   6 +-
 .../server/blockmanagement/BlockManager.java    |   9 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |   5 +-
 .../src/main/resources/hdfs-default.xml         |   9 +
 .../security/token/block/TestBlockToken.java    | 297 +++++++++++++++++--
 10 files changed, 480 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 0180828..ad80bc2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -121,9 +121,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmI
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTypeProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenSecretProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto;
@@ -584,6 +586,55 @@ public class PBHelperClient {
     return blockTokens;
   }
 
+  public static AccessModeProto convert(BlockTokenIdentifier.AccessMode aMode) {
+    switch (aMode) {
+    case READ: return AccessModeProto.READ;
+    case WRITE: return AccessModeProto.WRITE;
+    case COPY: return AccessModeProto.COPY;
+    case REPLACE: return AccessModeProto.REPLACE;
+    default:
+      throw new IllegalArgumentException("Unexpected AccessMode: " + aMode);
+    }
+  }
+
+  public static BlockTokenIdentifier.AccessMode convert(
+      AccessModeProto accessModeProto) {
+    switch (accessModeProto) {
+    case READ: return BlockTokenIdentifier.AccessMode.READ;
+    case WRITE: return BlockTokenIdentifier.AccessMode.WRITE;
+    case COPY: return BlockTokenIdentifier.AccessMode.COPY;
+    case REPLACE: return BlockTokenIdentifier.AccessMode.REPLACE;
+    default:
+      throw new IllegalArgumentException("Unexpected AccessModeProto: " +
+          accessModeProto);
+    }
+  }
+
+  public static BlockTokenSecretProto convert(
+      BlockTokenIdentifier blockTokenSecret) {
+    BlockTokenSecretProto.Builder builder =
+        BlockTokenSecretProto.newBuilder();
+    builder.setExpiryDate(blockTokenSecret.getExpiryDate());
+    builder.setKeyId(blockTokenSecret.getKeyId());
+    String userId = blockTokenSecret.getUserId();
+    if (userId != null) {
+      builder.setUserId(userId);
+    }
+
+    String blockPoolId = blockTokenSecret.getBlockPoolId();
+    if (blockPoolId != null) {
+      builder.setBlockPoolId(blockPoolId);
+    }
+
+    builder.setBlockId(blockTokenSecret.getBlockId());
+
+    for (BlockTokenIdentifier.AccessMode aMode :
+        blockTokenSecret.getAccessModes()) {
+      builder.addModes(convert(aMode));
+    }
+    return builder.build();
+  }
+
   static public DatanodeInfo convert(DatanodeInfoProto di) {
     if (di == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
index 3f2c9ca..28e7acc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
@@ -19,11 +19,16 @@
 package org.apache.hadoop.hdfs.security.token.block;
 
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.EnumSet;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenSecretProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -44,20 +49,22 @@ public class BlockTokenIdentifier extends TokenIdentifier {
   private String blockPoolId;
   private long blockId;
   private final EnumSet<AccessMode> modes;
+  private boolean useProto;
 
   private byte [] cache;
 
   public BlockTokenIdentifier() {
-    this(null, null, 0, EnumSet.noneOf(AccessMode.class));
+    this(null, null, 0, EnumSet.noneOf(AccessMode.class), false);
   }
 
   public BlockTokenIdentifier(String userId, String bpid, long blockId,
-      EnumSet<AccessMode> modes) {
+      EnumSet<AccessMode> modes, boolean useProto) {
     this.cache = null;
     this.userId = userId;
     this.blockPoolId = bpid;
     this.blockId = blockId;
     this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
+    this.useProto = useProto;
   }
 
   @Override
@@ -144,9 +151,45 @@ public class BlockTokenIdentifier extends TokenIdentifier {
         ^ (blockPoolId == null ? 0 : blockPoolId.hashCode());
   }
 
+  /**
+   * readFields peeks at the first byte of the DataInput and determines if it
+   * was written using WritableUtils ("Legacy") or Protobuf. We can do this
+   * because we know the first field is the Expiry date.
+   *
+   * In the case of the legacy buffer, the expiry date is a VInt, so the size
+   * (which should always be >1) is encoded in the first byte - which is
+   * always negative due to this encoding. However, there are sometimes null
+   * BlockTokenIdentifier written so we also need to handle the case there
+   * the first byte is also 0.
+   *
+   * In the case of protobuf, the first byte is a type tag for the expiry date
+   * which is written as <code>(field_number << 3 |  wire_type</code>.
+   * So as long as the field_number  is less than 16, but also positive, then
+   * we know we have a Protobuf.
+   *
+   * @param in <code>DataInput</code> to deserialize this object from.
+   * @throws IOException
+   */
   @Override
   public void readFields(DataInput in) throws IOException {
     this.cache = null;
+
+    final DataInputStream dis = (DataInputStream)in;
+    if (!dis.markSupported()) {
+      throw new IOException("Could not peek first byte.");
+    }
+    dis.mark(1);
+    final byte firstByte = dis.readByte();
+    dis.reset();
+    if (firstByte <= 0) {
+      readFieldsLegacy(dis);
+    } else {
+      readFieldsProtobuf(dis);
+    }
+  }
+
+  @VisibleForTesting
+  void readFieldsLegacy(DataInput in) throws IOException {
     expiryDate = WritableUtils.readVLong(in);
     keyId = WritableUtils.readVInt(in);
     userId = WritableUtils.readString(in);
@@ -157,10 +200,44 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     for (int i = 0; i < length; i++) {
       modes.add(WritableUtils.readEnum(in, AccessMode.class));
     }
+    useProto = false;
+  }
+
+  @VisibleForTesting
+  void readFieldsProtobuf(DataInput in) throws IOException {
+    BlockTokenSecretProto blockTokenSecretProto =
+        BlockTokenSecretProto.parseFrom((DataInputStream)in);
+    expiryDate = blockTokenSecretProto.getExpiryDate();
+    keyId = blockTokenSecretProto.getKeyId();
+    if (blockTokenSecretProto.hasUserId()) {
+      userId = blockTokenSecretProto.getUserId();
+    } else {
+      userId = null;
+    }
+    if (blockTokenSecretProto.hasBlockPoolId()) {
+      blockPoolId = blockTokenSecretProto.getBlockPoolId();
+    } else {
+      blockPoolId = null;
+    }
+    blockId = blockTokenSecretProto.getBlockId();
+    for (int i = 0; i < blockTokenSecretProto.getModesCount(); i++) {
+      AccessModeProto accessModeProto = blockTokenSecretProto.getModes(i);
+      modes.add(PBHelperClient.convert(accessModeProto));
+    }
+    useProto = true;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
+    if (useProto) {
+      writeProtobuf(out);
+    } else {
+      writeLegacy(out);
+    }
+  }
+
+  @VisibleForTesting
+  void writeLegacy(DataOutput out) throws IOException {
     WritableUtils.writeVLong(out, expiryDate);
     WritableUtils.writeVInt(out, keyId);
     WritableUtils.writeString(out, userId);
@@ -172,6 +249,12 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     }
   }
 
+  @VisibleForTesting
+  void writeProtobuf(DataOutput out) throws IOException {
+    BlockTokenSecretProto secret = PBHelperClient.convert(this);
+    out.write(secret.toByteArray());
+  }
+
   @Override
   public byte[] getBytes() {
     if(cache == null) cache = super.getBytes();
@@ -186,4 +269,4 @@ public class BlockTokenIdentifier extends TokenIdentifier {
       return KIND_NAME;
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 1414120..8a039d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -514,3 +514,36 @@ message RollingUpgradeStatusProto {
 message StorageUuidsProto {
   repeated string storageUuids = 1;
 }
+
+/**
+ * File access permissions mode.
+ */
+enum AccessModeProto {
+    READ = 1;
+    WRITE = 2;
+    COPY = 3;
+    REPLACE = 4;
+}
+
+/**
+ * Secret information for the BlockKeyProto. This is not sent on the wire as
+ * such but is used to pack a byte array and encrypted and put in
+ * BlockKeyProto.bytes
+ * When adding further fields, make sure they are optional as they would
+ * otherwise not be backwards compatible.
+ *
+ * Note: As part of the migration from WritableUtils based tokens (aka "legacy")
+ * to Protocol Buffers, we use the first byte to determine the type. If the
+ * first byte is <=0 then it is a legacy token. This means that when using
+ * protobuf tokens, the the first field sent must have a `field_number` less
+ * than 16 to make sure that the first byte is positive. Otherwise it could be
+ * parsed as a legacy token. See HDFS-11026 for more discussion.
+ */
+message BlockTokenSecretProto {
+  optional uint64 expiryDate = 1;
+  optional uint32 keyId = 2;
+  optional string userId = 3;
+  optional string blockPoolId = 4;
+  optional uint64 blockId = 5;
+  repeated AccessModeProto modes = 6;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 10a521b..cf1d21a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -641,6 +641,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT = 600L;
   public static final String  DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime";
   public static final long    DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L;
+  public static final String  DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE = "dfs.block.access.token.protobuf.enable";
+  public static final boolean DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT = false;
 
   public static final String DFS_BLOCK_REPLICATOR_CLASSNAME_KEY = "dfs.block.replicator.classname";
   public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT
= BlockPlacementPolicyDefault.class;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index ba08740..a3100d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -75,6 +75,7 @@ public class BlockTokenSecretManager extends
 
   private final int intRange;
   private final int nnRangeStart;
+  private final boolean useProto;
 
   private final SecureRandom nonceGenerator = new SecureRandom();
 
@@ -83,11 +84,13 @@ public class BlockTokenSecretManager extends
    *
    * @param keyUpdateInterval how often a new key will be generated
    * @param tokenLifetime how long an individual token is valid
+   * @param useProto should we use new protobuf style tokens
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
-      long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
+      boolean useProto) {
     this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
-        encryptionAlgorithm, 0, 1);
+        encryptionAlgorithm, 0, 1, useProto);
   }
 
   /**
@@ -102,8 +105,9 @@ public class BlockTokenSecretManager extends
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
       long tokenLifetime, int nnIndex, int numNNs,  String blockPoolId,
-      String encryptionAlgorithm) {
-    this(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm, nnIndex,
numNNs);
+      String encryptionAlgorithm, boolean useProto) {
+    this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
+        encryptionAlgorithm, nnIndex, numNNs, useProto);
     Preconditions.checkArgument(nnIndex >= 0);
     Preconditions.checkArgument(numNNs > 0);
     setSerialNo(new SecureRandom().nextInt());
@@ -111,7 +115,8 @@ public class BlockTokenSecretManager extends
   }
 
   private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
-      long tokenLifetime, String blockPoolId, String encryptionAlgorithm, int nnIndex, int
numNNs) {
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
+      int nnIndex, int numNNs, boolean useProto) {
     this.intRange = Integer.MAX_VALUE / numNNs;
     this.nnRangeStart = intRange * nnIndex;
     this.isMaster = isMaster;
@@ -120,6 +125,7 @@ public class BlockTokenSecretManager extends
     this.allKeys = new HashMap<Integer, BlockKey>();
     this.blockPoolId = blockPoolId;
     this.encryptionAlgorithm = encryptionAlgorithm;
+    this.useProto = useProto;
     generateKeys();
   }
 
@@ -246,7 +252,7 @@ public class BlockTokenSecretManager extends
   public Token<BlockTokenIdentifier> generateToken(String userId,
       ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException
{
     BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
-        .getBlockPoolId(), block.getBlockId(), modes);
+        .getBlockPoolId(), block.getBlockId(), modes, useProto);
     return new Token<BlockTokenIdentifier>(id, this);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
index 1c6b352..0aa6fb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
@@ -69,8 +69,12 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory
{
           + ", token lifetime=" + StringUtils.formatTime(tokenLifetime));
       String encryptionAlgorithm = conf.get(
           DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
+      final boolean enableProtobuf = conf.getBoolean(
+          DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
+          DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT);
       this.blockTokenSecretManager = new BlockTokenSecretManager(
-          updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm);
+          updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm,
+          enableProtobuf);
       this.blockTokenSecretManager.addKeys(keys);
 
       // sync block keys with NN more frequently than NN updates its block keys

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 893b12d..5125b33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -542,6 +542,9 @@ public class BlockManager implements BlockStatsMXBean {
     
     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
     boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
+    boolean shouldWriteProtobufToken = conf.getBoolean(
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT);
 
     if (isHaEnabled) {
       // figure out which index we are of the nns
@@ -555,10 +558,12 @@ public class BlockManager implements BlockStatsMXBean {
         nnIndex++;
       }
       return new BlockTokenSecretManager(updateMin * 60 * 1000L,
-          lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, encryptionAlgorithm);
+          lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null,
+          encryptionAlgorithm, shouldWriteProtobufToken);
     } else {
       return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm);
+          lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm,
+          shouldWriteProtobufToken);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a6dfa46..9ed80ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1534,9 +1534,12 @@ public class DataNode extends ReconfigurableBase
           + blockKeyUpdateInterval / (60 * 1000)
           + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
           + " min(s)");
+      final boolean enableProtobuf = getConf().getBoolean(
+          DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
+          DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT);
       final BlockTokenSecretManager secretMgr = 
           new BlockTokenSecretManager(0, blockTokenLifetime, blockPoolId,
-              dnConf.encryptionAlgorithm);
+              dnConf.encryptionAlgorithm, enableProtobuf);
       blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2bbc788..03f1a08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -503,6 +503,15 @@
 </property>
 
 <property>
+  <name>dfs.block.access.token.protobuf.enable</name>
+  <value>false</value>
+  <description>
+    If "true", block tokens are written using Protocol Buffers.
+    If "false", block tokens are written using Legacy format.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.data.dir</name>
   <value>file://${hadoop.tmp.dir}/dfs/data</value>
   <description>Determines where on the local filesystem an DFS data node

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ed33e9c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index 55e9d30..ecb63ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.security.token.block;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -31,7 +32,10 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Calendar;
 import java.util.EnumSet;
+import java.util.GregorianCalendar;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -57,6 +61,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
 import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.TestWritable;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -104,7 +110,7 @@ public class TestBlockToken {
   final ExtendedBlock block1 = new ExtendedBlock("0", 0L);
   final ExtendedBlock block2 = new ExtendedBlock("10", 10L);
   final ExtendedBlock block3 = new ExtendedBlock("-10", -108L);
-  
+
   @Before
   public void disableKerberos() {
     Configuration conf = new Configuration();
@@ -128,7 +134,7 @@ public class TestBlockToken {
         InvocationOnMock invocation) throws IOException {
       Object args[] = invocation.getArguments();
       assertEquals(2, args.length);
-      GetReplicaVisibleLengthRequestProto req = 
+      GetReplicaVisibleLengthRequestProto req =
           (GetReplicaVisibleLengthRequestProto) args[1];
       Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
           .getTokenIdentifiers();
@@ -158,11 +164,11 @@ public class TestBlockToken {
     return id;
   }
 
-  @Test
-  public void testWritable() throws Exception {
+  private void testWritable(boolean enableProtobuf) throws Exception {
     TestWritable.testWritable(new BlockTokenIdentifier());
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+        enableProtobuf);
     TestWritable.testWritable(generateTokenId(sm, block1,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)));
     TestWritable.testWritable(generateTokenId(sm, block2,
@@ -171,6 +177,16 @@ public class TestBlockToken {
         EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class)));
   }
 
+  @Test
+  public void testWritableLegacy() throws Exception {
+    testWritable(false);
+  }
+
+  @Test
+  public void testWritableProtobuf() throws Exception {
+    testWritable(true);
+  }
+
   private void tokenGenerationAndVerification(BlockTokenSecretManager master,
       BlockTokenSecretManager slave) throws Exception {
     // single-mode tokens
@@ -198,12 +214,14 @@ public class TestBlockToken {
   }
 
   /** test block key and token handling */
-  @Test
-  public void testBlockTokenSecretManager() throws Exception {
+  private void testBlockTokenSecretManager(boolean enableProtobuf)
+      throws Exception {
     BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+        enableProtobuf);
     BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null,
+        enableProtobuf);
     ExportedBlockKeys keys = masterHandler.exportKeys();
     slaveHandler.addKeys(keys);
     tokenGenerationAndVerification(masterHandler, slaveHandler);
@@ -215,6 +233,16 @@ public class TestBlockToken {
     tokenGenerationAndVerification(masterHandler, slaveHandler);
   }
 
+  @Test
+  public void testBlockTokenSecretManagerLegacy() throws Exception {
+    testBlockTokenSecretManager(false);
+  }
+
+  @Test
+  public void testBlockTokenSecretManagerProtobuf() throws Exception {
+    testBlockTokenSecretManager(true);
+  }
+
   private static Server createMockDatanode(BlockTokenSecretManager sm,
       Token<BlockTokenIdentifier> token, Configuration conf)
       throws IOException, ServiceException {
@@ -223,7 +251,7 @@ public class TestBlockToken {
     BlockTokenIdentifier id = sm.createIdentifier();
     id.readFields(new DataInputStream(new ByteArrayInputStream(token
         .getIdentifier())));
-    
+
     doAnswer(new GetLengthAnswer(sm, id)).when(mockDN)
         .getReplicaVisibleLength(any(RpcController.class),
             any(GetReplicaVisibleLengthRequestProto.class));
@@ -237,14 +265,14 @@ public class TestBlockToken {
         .setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
   }
 
-  @Test
-  public void testBlockTokenRpc() throws Exception {
+  private void testBlockTokenRpc(boolean enableProtobuf) throws Exception {
     Configuration conf = new Configuration();
     conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
     UserGroupInformation.setConfiguration(conf);
-    
+
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+        enableProtobuf);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
 
@@ -270,20 +298,30 @@ public class TestBlockToken {
     }
   }
 
+  @Test
+  public void testBlockTokenRpcLegacy() throws Exception {
+    testBlockTokenRpc(false);
+  }
+
+  @Test
+  public void testBlockTokenRpcProtobuf() throws Exception {
+    testBlockTokenRpc(true);
+  }
+
   /**
    * Test that fast repeated invocations of createClientDatanodeProtocolProxy
    * will not end up using up thousands of sockets. This is a regression test
    * for HDFS-1965.
    */
-  @Test
-  public void testBlockTokenRpcLeak() throws Exception {
+  private void testBlockTokenRpcLeak(boolean enableProtobuf) throws Exception {
     Configuration conf = new Configuration();
     conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
     UserGroupInformation.setConfiguration(conf);
-    
+
     Assume.assumeTrue(FD_DIR.exists());
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+        enableProtobuf);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
 
@@ -334,6 +372,16 @@ public class TestBlockToken {
     RPC.stopProxy(proxyToNoWhere);
   }
 
+  @Test
+  public void testBlockTokenRpcLeakLegacy() throws Exception {
+    testBlockTokenRpcLeak(false);
+  }
+
+  @Test
+  public void testBlockTokenRpcLeakProtobuf() throws Exception {
+    testBlockTokenRpcLeak(true);
+  }
+
   /**
    * @return the current number of file descriptors open by this process.
    */
@@ -344,17 +392,19 @@ public class TestBlockToken {
   /**
    * Test {@link BlockPoolTokenSecretManager}
    */
-  @Test
-  public void testBlockPoolTokenSecretManager() throws Exception {
+  private void testBlockPoolTokenSecretManager(boolean enableProtobuf)
+      throws Exception {
     BlockPoolTokenSecretManager bpMgr = new BlockPoolTokenSecretManager();
 
     // Test BlockPoolSecretManager with upto 10 block pools
     for (int i = 0; i < 10; i++) {
       String bpid = Integer.toString(i);
       BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
+          blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+          enableProtobuf);
       BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
+          blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null,
+          enableProtobuf);
       bpMgr.addBlockPool(bpid, slaveHandler);
 
       ExportedBlockKeys keys = masterHandler.exportKeys();
@@ -370,20 +420,31 @@ public class TestBlockToken {
     }
   }
 
+  @Test
+  public void testBlockPoolTokenSecretManagerLegacy() throws Exception {
+    testBlockPoolTokenSecretManager(false);
+  }
+
+  @Test
+  public void testBlockPoolTokenSecretManagerProtobuf() throws Exception {
+    testBlockPoolTokenSecretManager(true);
+  }
+
   /**
    * This test writes a file and gets the block locations without closing the
    * file, and tests the block token in the last block. Block token is verified
    * by ensuring it is of correct kind.
-   * 
+   *
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test
-  public void testBlockTokenInLastLocatedBlock() throws IOException,
-      InterruptedException {
+  private void testBlockTokenInLastLocatedBlock(boolean enableProtobuf)
+      throws IOException, InterruptedException {
     Configuration conf = new HdfsConfiguration();
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
+        enableProtobuf);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(1).build();
     cluster.waitActive();
@@ -411,4 +472,188 @@ public class TestBlockToken {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testBlockTokenInLastLocatedBlockLegacy() throws IOException,
+      InterruptedException {
+    testBlockTokenInLastLocatedBlock(false);
+  }
+
+  @Test
+  public void testBlockTokenInLastLocatedBlockProtobuf() throws IOException,
+      InterruptedException {
+    testBlockTokenInLastLocatedBlock(true);
+  }
+
+  @Test
+  public void testLegacyBlockTokenBytesIsLegacy() throws IOException {
+    final boolean useProto = false;
+    BlockTokenSecretManager sm = new BlockTokenSecretManager(
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+        useProto);
+    Token<BlockTokenIdentifier> token = sm.generateToken(block1,
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
+    final byte[] tokenBytes = token.getIdentifier();
+    BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
+    BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
+    BlockTokenIdentifier readToken = new BlockTokenIdentifier();
+
+    DataInputBuffer dib = new DataInputBuffer();
+
+    dib.reset(tokenBytes, tokenBytes.length);
+    legacyToken.readFieldsLegacy(dib);
+
+    boolean invalidProtobufMessage = false;
+    try {
+      dib.reset(tokenBytes, tokenBytes.length);
+      protobufToken.readFieldsProtobuf(dib);
+    } catch (IOException e) {
+      invalidProtobufMessage = true;
+    }
+    assertTrue(invalidProtobufMessage);
+
+    dib.reset(tokenBytes, tokenBytes.length);
+    readToken.readFields(dib);
+
+    // Using legacy, the token parses as a legacy block token and not a protobuf
+    assertEquals(legacyToken, readToken);
+    assertNotEquals(protobufToken, readToken);
+  }
+
+  @Test
+  public void testEmptyLegacyBlockTokenBytesIsLegacy() throws IOException {
+    BlockTokenIdentifier emptyIdent = new BlockTokenIdentifier();
+    DataOutputBuffer dob = new DataOutputBuffer(4096);
+    DataInputBuffer dib = new DataInputBuffer();
+
+    emptyIdent.writeLegacy(dob);
+    byte[] emptyIdentBytes = Arrays.copyOf(dob.getData(), dob.getLength());
+
+    BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
+    BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
+    BlockTokenIdentifier readToken = new BlockTokenIdentifier();
+
+    dib.reset(emptyIdentBytes, emptyIdentBytes.length);
+    legacyToken.readFieldsLegacy(dib);
+
+    boolean invalidProtobufMessage = false;
+    try {
+      dib.reset(emptyIdentBytes, emptyIdentBytes.length);
+      protobufToken.readFieldsProtobuf(dib);
+    } catch (IOException e) {
+      invalidProtobufMessage = true;
+    }
+    assertTrue(invalidProtobufMessage);
+
+    dib.reset(emptyIdentBytes, emptyIdentBytes.length);
+    readToken.readFields(dib);
+    assertTrue(invalidProtobufMessage);
+  }
+
+  @Test
+  public void testProtobufBlockTokenBytesIsProtobuf() throws IOException {
+    final boolean useProto = true;
+    BlockTokenSecretManager sm = new BlockTokenSecretManager(
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
+        useProto);
+    Token<BlockTokenIdentifier> token = sm.generateToken(block1,
+        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
+    final byte[] tokenBytes = token.getIdentifier();
+    BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
+    BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
+    BlockTokenIdentifier readToken = new BlockTokenIdentifier();
+
+    DataInputBuffer dib = new DataInputBuffer();
+
+    /* We receive NegativeArraySizeException because we didn't call
+     * readFields and instead try to parse this directly as a legacy
+     * BlockTokenIdentifier.
+     *
+     * Note: because the parsing depends on the expiryDate which is based on
+     * `Time.now()` it can sometimes fail with IOException and sometimes with
+     * NegativeArraySizeException.
+     */
+    boolean invalidLegacyMessage = false;
+    try {
+      dib.reset(tokenBytes, tokenBytes.length);
+      legacyToken.readFieldsLegacy(dib);
+    } catch (IOException | NegativeArraySizeException e) {
+      invalidLegacyMessage = true;
+    }
+    assertTrue(invalidLegacyMessage);
+
+    dib.reset(tokenBytes, tokenBytes.length);
+    protobufToken.readFieldsProtobuf(dib);
+
+    dib.reset(tokenBytes, tokenBytes.length);
+    readToken.readFields(dib);
+
+    // Using protobuf, the token parses as a protobuf and not a legacy block
+    // token
+    assertNotEquals(legacyToken, readToken);
+    assertEquals(protobufToken, readToken);
+  }
+
+  public void testCraftedProtobufBlockTokenIdentifier(
+      BlockTokenIdentifier identifier, boolean expectIOE,
+      boolean expectRTE) throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer(4096);
+    DataInputBuffer dib = new DataInputBuffer();
+
+    identifier.writeProtobuf(dob);
+    byte[] identBytes = Arrays.copyOf(dob.getData(), dob.getLength());
+
+    BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
+    BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
+    BlockTokenIdentifier readToken = new BlockTokenIdentifier();
+
+    boolean invalidLegacyMessage = false;
+    try {
+      dib.reset(identBytes, identBytes.length);
+      legacyToken.readFieldsLegacy(dib);
+    } catch (IOException e) {
+      if (!expectIOE) {
+        fail("Received IOException but it was not expected.");
+      }
+      invalidLegacyMessage = true;
+    } catch (RuntimeException e) {
+      if (!expectRTE) {
+        fail("Received RuntimeException but it was not expected.");
+      }
+      invalidLegacyMessage = true;
+    }
+
+    assertTrue(invalidLegacyMessage);
+
+    dib.reset(identBytes, identBytes.length);
+    protobufToken.readFieldsProtobuf(dib);
+
+    dib.reset(identBytes, identBytes.length);
+    readToken.readFieldsProtobuf(dib);
+    assertEquals(protobufToken, readToken);
+  }
+
+  @Test
+  public void testCraftedProtobufBlockTokenBytesIsProtobuf() throws
+      IOException {
+    // Empty BlockTokenIdentifiers throw IOException
+    BlockTokenIdentifier identifier = new BlockTokenIdentifier();
+    testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
+
+    /* Parsing BlockTokenIdentifier with expiryDate
+     * 2017-02-09 00:12:35,072+0100 will throw IOException.
+     * However, expiryDate of
+     * 2017-02-09 00:12:35,071+0100 will throw NegativeArraySizeException.
+     */
+    Calendar cal = new GregorianCalendar();
+    cal.set(2017, 1, 9, 0, 12, 35);
+    long datetime = cal.getTimeInMillis();
+    datetime = ((datetime / 1000) * 1000); // strip milliseconds.
+    datetime = datetime + 71; // 2017-02-09 00:12:35,071+0100
+    identifier.setExpiryDate(datetime);
+    testCraftedProtobufBlockTokenIdentifier(identifier, false, true);
+    datetime += 1; // 2017-02-09 00:12:35,072+0100
+    identifier.setExpiryDate(datetime);
+    testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message