hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From waltersu4...@apache.org
Subject hadoop git commit: HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock. Contributed by Walter Su.
Date Mon, 20 Jul 2015 02:14:12 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 4fdd9abd7 -> 06394e376


HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock. Contributed by Walter Su.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/06394e37
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/06394e37
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/06394e37

Branch: refs/heads/HDFS-7285
Commit: 06394e37601186d2bcff49ccea00712fda9b3579
Parents: 4fdd9ab
Author: Walter Su <waltersu4549@apache.org>
Authored: Mon Jul 20 10:18:34 2015 +0800
Committer: Walter Su <waltersu4549@apache.org>
Committed: Mon Jul 20 10:18:34 2015 +0800

----------------------------------------------------------------------
 .../hdfs/protocol/LocatedStripedBlock.java      |  16 +
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   2 +
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  15 +-
 .../server/blockmanagement/BlockManager.java    |  21 +-
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  13 +-
 .../hadoop-hdfs/src/main/proto/hdfs.proto       |   3 +
 .../blockmanagement/TestBlockTokenWithDFS.java  | 422 ++++++++++---------
 .../TestBlockTokenWithDFSStriped.java           | 119 ++++++
 8 files changed, 407 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/06394e37/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
index dc5a77f..6e62220 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocol;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
 
 import java.util.Arrays;
 
@@ -32,8 +34,10 @@ import java.util.Arrays;
 @InterfaceStability.Evolving
 public class LocatedStripedBlock extends LocatedBlock {
   private static final int[] EMPTY_INDICES = {};
+  private static final Token<BlockTokenIdentifier> EMPTY_TOKEN = new Token<>();
 
   private int[] blockIndices;
+  private Token<BlockTokenIdentifier>[] blockTokens;
 
   public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs,
       String[] storageIDs, StorageType[] storageTypes, int[] indices,
@@ -46,6 +50,10 @@ public class LocatedStripedBlock extends LocatedBlock {
       this.blockIndices = new int[indices.length];
       System.arraycopy(indices, 0, blockIndices, 0, indices.length);
     }
+    blockTokens = new Token[blockIndices.length];
+    for (int i = 0; i < blockIndices.length; i++) {
+      blockTokens[i] = EMPTY_TOKEN;
+    }
   }
 
   @Override
@@ -67,4 +75,12 @@ public class LocatedStripedBlock extends LocatedBlock {
   public boolean isStriped() {
     return true;
   }
+
+  public Token<BlockTokenIdentifier>[] getBlockTokens() {
+    return blockTokens;
+  }
+
+  public void setBlockTokens(Token<BlockTokenIdentifier>[] tokens) {
+    this.blockTokens = tokens;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06394e37/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index b135c08..4709388 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -362,3 +362,5 @@
 
     HDFS-8787. Erasure coding: rename BlockInfoContiguousUC and BlockInfoStripedUC
     to be consistent with trunk. (zhz)
+
+    HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock.(waltersu4549)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06394e37/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 6baa005..1bc0964 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -813,9 +813,12 @@ public class PBHelper {
       builder.addAllStorageIDs(Arrays.asList(storageIDs));
     }
     if (b instanceof LocatedStripedBlock) {
-      int[] indices = ((LocatedStripedBlock) b).getBlockIndices();
-      for (int index : indices) {
-        builder.addBlockIndex(index);
+      LocatedStripedBlock sb = (LocatedStripedBlock) b;
+      int[] indices = sb.getBlockIndices();
+      Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
+      for (int i = 0; i < indices.length; i++) {
+        builder.addBlockIndex(indices[i]);
+        builder.addBlockTokens(PBHelper.convert(blockTokens[i]));
       }
     }
 
@@ -872,6 +875,12 @@ public class PBHelper {
           storageIDs, storageTypes, indices, proto.getOffset(),
           proto.getCorrupt(),
           cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
+      List<TokenProto> tokenProtos = proto.getBlockTokensList();
+      Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
+      for (int i = 0; i < indices.length; i++) {
+        blockTokens[i] = PBHelper.convert(tokenProtos.get(i));
+      }
+      ((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
     }
     lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06394e37/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index edcc14e..7872baa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -92,6 +92,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLengt
 
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.Time;
@@ -989,9 +990,23 @@ public class BlockManager {
       final AccessMode mode) throws IOException {
     if (isBlockTokenEnabled()) {
       // Use cached UGI if serving RPC calls.
-      b.setBlockToken(blockTokenSecretManager.generateToken(
-          NameNode.getRemoteUser().getShortUserName(),
-          b.getBlock(), EnumSet.of(mode)));
+      if (b.isStriped()) {
+        LocatedStripedBlock sb = (LocatedStripedBlock) b;
+        int[] indices = sb.getBlockIndices();
+        Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
+        ExtendedBlock internalBlock = new ExtendedBlock(b.getBlock());
+        for (int i = 0; i < indices.length; i++) {
+          internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]);
+          blockTokens[i] = blockTokenSecretManager.generateToken(
+              NameNode.getRemoteUser().getShortUserName(),
+              internalBlock, EnumSet.of(mode));
+        }
+        sb.setBlockTokens(blockTokens);
+      } else {
+        b.setBlockToken(blockTokenSecretManager.generateToken(
+            NameNode.getRemoteUser().getShortUserName(),
+            b.getBlock(), EnumSet.of(mode)));
+      }
     }    
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06394e37/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 6bd5e1f..9b0939c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -30,8 +30,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.security.token.Token;
 
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -105,17 +107,22 @@ public class StripedBlockUtil {
       int idxInBlockGroup) {
     final ExtendedBlock blk = constructInternalBlock(
         bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
-
+    final LocatedBlock locatedBlock;
     if (idxInReturnedLocs < bg.getLocations().length) {
-      return new LocatedBlock(blk,
+      locatedBlock = new LocatedBlock(blk,
           new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
           new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
           new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
           bg.getStartOffset(), bg.isCorrupt(), null);
     } else {
-      return new LocatedBlock(blk, null, null, null,
+      locatedBlock = new LocatedBlock(blk, null, null, null,
           bg.getStartOffset(), bg.isCorrupt(), null);
     }
+    Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens();
+    if (idxInBlockGroup < blockTokens.length) {
+      locatedBlock.setBlockToken(blockTokens[idxInBlockGroup]);
+    }
+    return locatedBlock;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06394e37/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index e1f944f..d2cb665 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -220,7 +220,10 @@ message LocatedBlockProto {
   repeated bool isCached = 6 [packed=true]; // if a location in locs is cached
   repeated StorageTypeProto storageTypes = 7;
   repeated string storageIDs = 8;
+
+  // striped block related fields
   repeated uint32 blockIndex = 9; // used for striped block to indicate block index for each
storage
+  repeated hadoop.common.TokenProto blockTokens = 10; // each internal block has a block
token
 }
 
 message DataEncryptionKeyProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06394e37/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
index 43f2992..26ed1fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 
@@ -69,28 +68,32 @@ import org.junit.Test;
 
 public class TestBlockTokenWithDFS {
 
-  private static final int BLOCK_SIZE = 1024;
-  private static final int FILE_SIZE = 2 * BLOCK_SIZE;
+  protected static int BLOCK_SIZE = 1024;
+  protected static int FILE_SIZE = 2 * BLOCK_SIZE;
   private static final String FILE_TO_READ = "/fileToRead.dat";
   private static final String FILE_TO_WRITE = "/fileToWrite.dat";
   private static final String FILE_TO_APPEND = "/fileToAppend.dat";
-  private final byte[] rawData = new byte[FILE_SIZE];
 
   {
     ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  public static byte[] generateBytes(int fileSize){
     Random r = new Random();
+    byte[] rawData = new byte[fileSize];
     r.nextBytes(rawData);
+    return rawData;
   }
 
-  private void createFile(FileSystem fs, Path filename) throws IOException {
+  private void createFile(FileSystem fs, Path filename, byte[] expected) throws IOException
{
     FSDataOutputStream out = fs.create(filename);
-    out.write(rawData);
+    out.write(expected);
     out.close();
   }
 
   // read a file using blockSeekTo()
-  private boolean checkFile1(FSDataInputStream in) {
-    byte[] toRead = new byte[FILE_SIZE];
+  private boolean checkFile1(FSDataInputStream in, byte[] expected) {
+    byte[] toRead = new byte[expected.length];
     int totalRead = 0;
     int nRead = 0;
     try {
@@ -101,27 +104,27 @@ public class TestBlockTokenWithDFS {
       return false;
     }
     assertEquals("Cannot read file.", toRead.length, totalRead);
-    return checkFile(toRead);
+    return checkFile(toRead, expected);
   }
 
   // read a file using fetchBlockByteRange()
-  private boolean checkFile2(FSDataInputStream in) {
-    byte[] toRead = new byte[FILE_SIZE];
+  private boolean checkFile2(FSDataInputStream in, byte[] expected) {
+    byte[] toRead = new byte[expected.length];
     try {
       assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0,
           toRead.length));
     } catch (IOException e) {
       return false;
     }
-    return checkFile(toRead);
+    return checkFile(toRead, expected);
   }
 
-  private boolean checkFile(byte[] fileToCheck) {
-    if (fileToCheck.length != rawData.length) {
+  private boolean checkFile(byte[] fileToCheck, byte[] expected) {
+    if (fileToCheck.length != expected.length) {
       return false;
     }
     for (int i = 0; i < fileToCheck.length; i++) {
-      if (fileToCheck[i] != rawData[i]) {
+      if (fileToCheck[i] != expected[i]) {
         return false;
       }
     }
@@ -137,7 +140,7 @@ public class TestBlockTokenWithDFS {
   }
 
   // try reading a block using a BlockReader directly
-  private static void tryRead(final Configuration conf, LocatedBlock lblock,
+  protected void tryRead(final Configuration conf, LocatedBlock lblock,
       boolean shouldSucceed) {
     InetSocketAddress targetAddr = null;
     IOException ioe = null;
@@ -148,7 +151,7 @@ public class TestBlockTokenWithDFS {
       targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
 
       blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
-          setFileName(BlockReaderFactory.getFileName(targetAddr, 
+          setFileName(BlockReaderFactory.getFileName(targetAddr,
                         "test-blockpoolid", block.getBlockId())).
           setBlock(block).
           setBlockToken(lblock.getBlockToken()).
@@ -205,7 +208,7 @@ public class TestBlockTokenWithDFS {
   }
 
   // get a conf for testing
-  private static Configuration getConf(int numDataNodes) {
+  protected Configuration getConf(int numDataNodes) {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@@ -241,16 +244,16 @@ public class TestBlockTokenWithDFS {
       SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
       Path fileToAppend = new Path(FILE_TO_APPEND);
       FileSystem fs = cluster.getFileSystem();
-
+      byte[] expected = generateBytes(FILE_SIZE);
       // write a one-byte file
       FSDataOutputStream stm = writeFile(fs, fileToAppend,
           (short) numDataNodes, BLOCK_SIZE);
-      stm.write(rawData, 0, 1);
+      stm.write(expected, 0, 1);
       stm.close();
       // open the file again for append
       stm = fs.append(fileToAppend);
-      int mid = rawData.length - 1;
-      stm.write(rawData, 1, mid - 1);
+      int mid = expected.length - 1;
+      stm.write(expected, 1, mid - 1);
       stm.hflush();
 
       /*
@@ -267,11 +270,11 @@ public class TestBlockTokenWithDFS {
       // remove a datanode to force re-establishing pipeline
       cluster.stopDataNode(0);
       // append the rest of the file
-      stm.write(rawData, mid, rawData.length - mid);
+      stm.write(expected, mid, expected.length - mid);
       stm.close();
       // check if append is successful
       FSDataInputStream in5 = fs.open(fileToAppend);
-      assertTrue(checkFile1(in5));
+      assertTrue(checkFile1(in5, expected));
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -303,11 +306,12 @@ public class TestBlockTokenWithDFS {
       Path fileToWrite = new Path(FILE_TO_WRITE);
       FileSystem fs = cluster.getFileSystem();
 
+      byte[] expected = generateBytes(FILE_SIZE);
       FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes,
           BLOCK_SIZE);
       // write a partial block
-      int mid = rawData.length - 1;
-      stm.write(rawData, 0, mid);
+      int mid = expected.length - 1;
+      stm.write(expected, 0, mid);
       stm.hflush();
 
       /*
@@ -324,11 +328,11 @@ public class TestBlockTokenWithDFS {
       // remove a datanode to force re-establishing pipeline
       cluster.stopDataNode(0);
       // write the rest of the file
-      stm.write(rawData, mid, rawData.length - mid);
+      stm.write(expected, mid, expected.length - mid);
       stm.close();
       // check if write is successful
       FSDataInputStream in4 = fs.open(fileToWrite);
-      assertTrue(checkFile1(in4));
+      assertTrue(checkFile1(in4, expected));
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -346,125 +350,137 @@ public class TestBlockTokenWithDFS {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
       cluster.waitActive();
       assertEquals(numDataNodes, cluster.getDataNodes().size());
+      doTestRead(conf, cluster, false);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 
-      final NameNode nn = cluster.getNameNode();
-      final NamenodeProtocols nnProto = nn.getRpcServer();
-      final BlockManager bm = nn.getNamesystem().getBlockManager();
-      final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
+  protected void doTestRead(Configuration conf, MiniDFSCluster cluster,
+      boolean isStriped) throws Exception {
+    final int numDataNodes = cluster.getDataNodes().size();
+    final NameNode nn = cluster.getNameNode();
+    final NamenodeProtocols nnProto = nn.getRpcServer();
+    final BlockManager bm = nn.getNamesystem().getBlockManager();
+    final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
 
-      // set a short token lifetime (1 second) initially
-      SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
+    // set a short token lifetime (1 second) initially
+    SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
 
-      Path fileToRead = new Path(FILE_TO_READ);
-      FileSystem fs = cluster.getFileSystem();
-      createFile(fs, fileToRead);
+    Path fileToRead = new Path(FILE_TO_READ);
+    FileSystem fs = cluster.getFileSystem();
+    byte[] expected = generateBytes(FILE_SIZE);
+    createFile(fs, fileToRead, expected);
 
       /*
        * setup for testing expiration handling of cached tokens
        */
 
-      // read using blockSeekTo(). Acquired tokens are cached in in1
-      FSDataInputStream in1 = fs.open(fileToRead);
-      assertTrue(checkFile1(in1));
-      // read using blockSeekTo(). Acquired tokens are cached in in2
-      FSDataInputStream in2 = fs.open(fileToRead);
-      assertTrue(checkFile1(in2));
-      // read using fetchBlockByteRange(). Acquired tokens are cached in in3
-      FSDataInputStream in3 = fs.open(fileToRead);
-      assertTrue(checkFile2(in3));
+    // read using blockSeekTo(). Acquired tokens are cached in in1
+    FSDataInputStream in1 = fs.open(fileToRead);
+    assertTrue(checkFile1(in1,expected));
+    // read using blockSeekTo(). Acquired tokens are cached in in2
+    FSDataInputStream in2 = fs.open(fileToRead);
+    assertTrue(checkFile1(in2,expected));
+    // read using fetchBlockByteRange(). Acquired tokens are cached in in3
+    FSDataInputStream in3 = fs.open(fileToRead);
+    assertTrue(checkFile2(in3,expected));
 
       /*
        * testing READ interface on DN using a BlockReader
        */
-      DFSClient client = null;
-      try {
-        client = new DFSClient(new InetSocketAddress("localhost",
+    DFSClient client = null;
+    try {
+      client = new DFSClient(new InetSocketAddress("localhost",
           cluster.getNameNodePort()), conf);
-      } finally {
-        if (client != null) client.close();
-      }
-      List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
-          FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
-      LocatedBlock lblock = locatedBlocks.get(0); // first block
-      Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
-      // verify token is not expired
-      assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken));
-      // read with valid token, should succeed
-      tryRead(conf, lblock, true);
+    } finally {
+      if (client != null) client.close();
+    }
+    List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
+        FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
+    LocatedBlock lblock = locatedBlocks.get(0); // first block
+    // verify token is not expired
+    assertFalse(isBlockTokenExpired(lblock));
+    // read with valid token, should succeed
+    tryRead(conf, lblock, true);
 
       /*
        * wait till myToken and all cached tokens in in1, in2 and in3 expire
        */
 
-      while (!SecurityTestUtil.isBlockTokenExpired(myToken)) {
-        try {
-          Thread.sleep(10);
-        } catch (InterruptedException ignored) {
-        }
+    while (!isBlockTokenExpired(lblock)) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ignored) {
       }
+    }
 
       /*
        * continue testing READ interface on DN using a BlockReader
        */
 
-      // verify token is expired
-      assertTrue(SecurityTestUtil.isBlockTokenExpired(myToken));
-      // read should fail
-      tryRead(conf, lblock, false);
-      // use a valid new token
-      lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
-              EnumSet.of(BlockTokenIdentifier.AccessMode.READ)));
-      // read should succeed
-      tryRead(conf, lblock, true);
-      // use a token with wrong blockID
-      ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock()
-          .getBlockPoolId(), lblock.getBlock().getBlockId() + 1);
-      lblock.setBlockToken(sm.generateToken(wrongBlock,
-          EnumSet.of(BlockTokenIdentifier.AccessMode.READ)));
-      // read should fail
-      tryRead(conf, lblock, false);
-      // use a token with wrong access modes
-      lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
-          EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE,
-                     BlockTokenIdentifier.AccessMode.COPY,
-                     BlockTokenIdentifier.AccessMode.REPLACE)));
-      // read should fail
-      tryRead(conf, lblock, false);
-
-      // set a long token lifetime for future tokens
-      SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L);
+    // verify token is expired
+    assertTrue(isBlockTokenExpired(lblock));
+    // read should fail
+    tryRead(conf, lblock, false);
+    // use a valid new token
+    bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ);
+    // read should succeed
+    tryRead(conf, lblock, true);
+    // use a token with wrong blockID
+    long rightId = lblock.getBlock().getBlockId();
+    long wrongId = rightId + 1;
+    lblock.getBlock().setBlockId(wrongId);
+    bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ);
+    lblock.getBlock().setBlockId(rightId);
+    // read should fail
+    tryRead(conf, lblock, false);
+    // use a token with wrong access modes
+    bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.WRITE);
+    // read should fail
+    tryRead(conf, lblock, false);
+
+    // set a long token lifetime for future tokens
+    SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L);
 
       /*
        * testing that when cached tokens are expired, DFSClient will re-fetch
        * tokens transparently for READ.
        */
 
-      // confirm all tokens cached in in1 are expired by now
-      List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
-      for (LocatedBlock blk : lblocks) {
-        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify blockSeekTo() is able to re-fetch token transparently
-      in1.seek(0);
-      assertTrue(checkFile1(in1));
-
-      // confirm all tokens cached in in2 are expired by now
-      List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
-      for (LocatedBlock blk : lblocks2) {
-        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify blockSeekTo() is able to re-fetch token transparently (testing
-      // via another interface method)
+    // confirm all tokens cached in in1 are expired by now
+    List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
+    for (LocatedBlock blk : lblocks) {
+      assertTrue(isBlockTokenExpired(blk));
+    }
+    // verify blockSeekTo() is able to re-fetch token transparently
+    in1.seek(0);
+    assertTrue(checkFile1(in1, expected));
+
+    // confirm all tokens cached in in2 are expired by now
+    List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
+    for (LocatedBlock blk : lblocks2) {
+      assertTrue(isBlockTokenExpired(blk));
+    }
+    // verify blockSeekTo() is able to re-fetch token transparently (testing
+    // via another interface method)
+    if (isStriped) {
+      // striped block doesn't support seekToNewSource
+      in2.seek(0);
+    } else {
       assertTrue(in2.seekToNewSource(0));
-      assertTrue(checkFile1(in2));
+    }
+    assertTrue(checkFile1(in2,expected));
 
-      // confirm all tokens cached in in3 are expired by now
-      List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
-      for (LocatedBlock blk : lblocks3) {
-        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify fetchBlockByteRange() is able to re-fetch token transparently
-      assertTrue(checkFile2(in3));
+    // confirm all tokens cached in in3 are expired by now
+    List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
+    for (LocatedBlock blk : lblocks3) {
+      assertTrue(isBlockTokenExpired(blk));
+    }
+    // verify fetchBlockByteRange() is able to re-fetch token transparently
+    assertTrue(checkFile2(in3,expected));
 
       /*
        * testing that after datanodes are restarted on the same ports, cached
@@ -473,37 +489,42 @@ public class TestBlockTokenWithDFS {
        * new tokens can be fetched from namenode).
        */
 
-      // restart datanodes on the same ports that they currently use
-      assertTrue(cluster.restartDataNodes(true));
-      cluster.waitActive();
-      assertEquals(numDataNodes, cluster.getDataNodes().size());
-      cluster.shutdownNameNode(0);
+    // restart datanodes on the same ports that they currently use
+    assertTrue(cluster.restartDataNodes(true));
+    cluster.waitActive();
+    assertEquals(numDataNodes, cluster.getDataNodes().size());
+    cluster.shutdownNameNode(0);
 
-      // confirm tokens cached in in1 are still valid
-      lblocks = DFSTestUtil.getAllBlocks(in1);
-      for (LocatedBlock blk : lblocks) {
-        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify blockSeekTo() still works (forced to use cached tokens)
-      in1.seek(0);
-      assertTrue(checkFile1(in1));
-
-      // confirm tokens cached in in2 are still valid
-      lblocks2 = DFSTestUtil.getAllBlocks(in2);
-      for (LocatedBlock blk : lblocks2) {
-        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify blockSeekTo() still works (forced to use cached tokens)
+    // confirm tokens cached in in1 are still valid
+    lblocks = DFSTestUtil.getAllBlocks(in1);
+    for (LocatedBlock blk : lblocks) {
+      assertFalse(isBlockTokenExpired(blk));
+    }
+    // verify blockSeekTo() still works (forced to use cached tokens)
+    in1.seek(0);
+    assertTrue(checkFile1(in1,expected));
+
+    // confirm tokens cached in in2 are still valid
+    lblocks2 = DFSTestUtil.getAllBlocks(in2);
+    for (LocatedBlock blk : lblocks2) {
+      assertFalse(isBlockTokenExpired(blk));
+    }
+
+    // verify blockSeekTo() still works (forced to use cached tokens)
+    if (isStriped) {
+      in2.seek(0);
+    } else {
       in2.seekToNewSource(0);
-      assertTrue(checkFile1(in2));
+    }
+    assertTrue(checkFile1(in2,expected));
 
-      // confirm tokens cached in in3 are still valid
-      lblocks3 = DFSTestUtil.getAllBlocks(in3);
-      for (LocatedBlock blk : lblocks3) {
-        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify fetchBlockByteRange() still works (forced to use cached tokens)
-      assertTrue(checkFile2(in3));
+    // confirm tokens cached in in3 are still valid
+    lblocks3 = DFSTestUtil.getAllBlocks(in3);
+    for (LocatedBlock blk : lblocks3) {
+      assertFalse(isBlockTokenExpired(blk));
+    }
+    // verify fetchBlockByteRange() still works (forced to use cached tokens)
+    assertTrue(checkFile2(in3,expected));
 
       /*
        * testing that when namenode is restarted, cached tokens should still
@@ -512,18 +533,23 @@ public class TestBlockTokenWithDFS {
        * setup for this test depends on the previous test.
        */
 
-      // restart the namenode and then shut it down for test
-      cluster.restartNameNode(0);
-      cluster.shutdownNameNode(0);
+    // restart the namenode and then shut it down for test
+    cluster.restartNameNode(0);
+    cluster.shutdownNameNode(0);
 
-      // verify blockSeekTo() still works (forced to use cached tokens)
-      in1.seek(0);
-      assertTrue(checkFile1(in1));
-      // verify again blockSeekTo() still works (forced to use cached tokens)
+    // verify blockSeekTo() still works (forced to use cached tokens)
+    in1.seek(0);
+    assertTrue(checkFile1(in1,expected));
+    // verify again blockSeekTo() still works (forced to use cached tokens)
+    if (isStriped) {
+      in2.seek(0);
+    } else {
       in2.seekToNewSource(0);
-      assertTrue(checkFile1(in2));
-      // verify fetchBlockByteRange() still works (forced to use cached tokens)
-      assertTrue(checkFile2(in3));
+    }
+    assertTrue(checkFile1(in2,expected));
+
+    // verify fetchBlockByteRange() still works (forced to use cached tokens)
+    assertTrue(checkFile2(in3,expected));
 
       /*
        * testing that after both namenode and datanodes got restarted (namenode
@@ -532,58 +558,60 @@ public class TestBlockTokenWithDFS {
        * setup of this test depends on the previous test.
        */
 
-      // restore the cluster and restart the datanodes for test
-      cluster.restartNameNode(0);
-      assertTrue(cluster.restartDataNodes(true));
-      cluster.waitActive();
-      assertEquals(numDataNodes, cluster.getDataNodes().size());
-
-      // shutdown namenode so that DFSClient can't get new tokens from namenode
-      cluster.shutdownNameNode(0);
-
-      // verify blockSeekTo() fails (cached tokens become invalid)
-      in1.seek(0);
-      assertFalse(checkFile1(in1));
-      // verify fetchBlockByteRange() fails (cached tokens become invalid)
-      assertFalse(checkFile2(in3));
-
-      // restart the namenode to allow DFSClient to re-fetch tokens
-      cluster.restartNameNode(0);
-      // verify blockSeekTo() works again (by transparently re-fetching
-      // tokens from namenode)
-      in1.seek(0);
-      assertTrue(checkFile1(in1));
+    // restore the cluster and restart the datanodes for test
+    cluster.restartNameNode(0);
+    assertTrue(cluster.restartDataNodes(true));
+    cluster.waitActive();
+    assertEquals(numDataNodes, cluster.getDataNodes().size());
+
+    // shutdown namenode so that DFSClient can't get new tokens from namenode
+    cluster.shutdownNameNode(0);
+
+    // verify blockSeekTo() fails (cached tokens become invalid)
+    in1.seek(0);
+    assertFalse(checkFile1(in1,expected));
+    // verify fetchBlockByteRange() fails (cached tokens become invalid)
+    assertFalse(checkFile2(in3,expected));
+
+    // restart the namenode to allow DFSClient to re-fetch tokens
+    cluster.restartNameNode(0);
+    // verify blockSeekTo() works again (by transparently re-fetching
+    // tokens from namenode)
+    in1.seek(0);
+    assertTrue(checkFile1(in1,expected));
+    if (isStriped) {
+      in2.seek(0);
+    } else {
       in2.seekToNewSource(0);
-      assertTrue(checkFile1(in2));
-      // verify fetchBlockByteRange() works again (by transparently
-      // re-fetching tokens from namenode)
-      assertTrue(checkFile2(in3));
+    }
+    assertTrue(checkFile1(in2,expected));
+    // verify fetchBlockByteRange() works again (by transparently
+    // re-fetching tokens from namenode)
+    assertTrue(checkFile2(in3,expected));
 
       /*
        * testing that when datanodes are restarted on different ports, DFSClient
        * is able to re-fetch tokens transparently to connect to them
        */
 
-      // restart datanodes on newly assigned ports
-      assertTrue(cluster.restartDataNodes(false));
-      cluster.waitActive();
-      assertEquals(numDataNodes, cluster.getDataNodes().size());
-      // verify blockSeekTo() is able to re-fetch token transparently
-      in1.seek(0);
-      assertTrue(checkFile1(in1));
-      // verify blockSeekTo() is able to re-fetch token transparently
+    // restart datanodes on newly assigned ports
+    assertTrue(cluster.restartDataNodes(false));
+    cluster.waitActive();
+    assertEquals(numDataNodes, cluster.getDataNodes().size());
+    // verify blockSeekTo() is able to re-fetch token transparently
+    in1.seek(0);
+    assertTrue(checkFile1(in1,expected));
+    // verify blockSeekTo() is able to re-fetch token transparently
+    if (isStriped) {
+      in2.seek(0);
+    } else {
       in2.seekToNewSource(0);
-      assertTrue(checkFile1(in2));
-      // verify fetchBlockByteRange() is able to re-fetch token transparently
-      assertTrue(checkFile2(in3));
-
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
     }
-  }
+    assertTrue(checkFile1(in2,expected));
+    // verify fetchBlockByteRange() is able to re-fetch token transparently
+    assertTrue(checkFile2(in3,expected));
 
+  }
   /**
    * Integration testing of access token, involving NN, DN, and Balancer
    */
@@ -593,4 +621,8 @@ public class TestBlockTokenWithDFS {
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     new TestBalancer().integrationTest(conf);
   }
+
+  protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException {
+    return SecurityTestUtil.isBlockTokenExpired(lb.getBlockToken());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06394e37/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
new file mode 100644
index 0000000..e212917
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS {
+
+  private final static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  private final static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final static int stripesPerBlock = 4;
+  private final static int numDNs = dataBlocks + parityBlocks + 2;
+  private static MiniDFSCluster cluster;
+  private static Configuration conf;
+
+  {
+    BLOCK_SIZE = cellSize * stripesPerBlock;
+    FILE_SIZE =  BLOCK_SIZE * dataBlocks * 3;
+  }
+
+  @Before
+  public void setup() throws IOException {
+    conf = getConf();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient()
+        .createErasureCodingZone("/", null, cellSize);
+    cluster.waitActive();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private Configuration getConf() {
+    Configuration conf = super.getConf(numDNs);
+    conf.setInt("io.bytes.per.checksum", cellSize);
+    return conf;
+  }
+
+  @Test
+  @Override
+  public void testRead() throws Exception {
+    //TODO: DFSStripedInputStream handles token expiration
+//    doTestRead(conf, cluster, true);
+  }
+
+  @Test
+  @Override
+  public void testWrite() throws Exception {
+    //TODO: DFSStripedOutputStream handles token expiration
+  }
+
+  @Test
+  @Override
+  public void testAppend() throws Exception {
+    //TODO: support Append for striped file
+  }
+
+  @Test
+  @Override
+  public void testEnd2End() throws Exception {
+    //TODO: DFSStripedOutputStream handles token expiration
+  }
+
+  @Override
+  protected void tryRead(final Configuration conf, LocatedBlock lblock,
+                         boolean shouldSucceed) {
+    LocatedStripedBlock lsb = (LocatedStripedBlock) lblock;
+    LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup
+        (lsb, cellSize, dataBlocks, parityBlocks);
+    for (LocatedBlock internalBlock : internalBlocks) {
+      super.tryRead(conf, internalBlock, shouldSucceed);
+    }
+  }
+
+  @Override
+  protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException {
+    LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+    LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup
+        (lsb, cellSize, dataBlocks, parityBlocks);
+    for (LocatedBlock internalBlock : internalBlocks) {
+      if(super.isBlockTokenExpired(internalBlock)){
+        return true;
+      }
+    }
+    return false;
+  }
+}


Mime
View raw message