hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [18/36] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS.
Date Fri, 14 Aug 2015 17:55:30 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
index 43f2992..26ed1fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 
@@ -69,28 +68,32 @@ import org.junit.Test;
 
 public class TestBlockTokenWithDFS {
 
-  private static final int BLOCK_SIZE = 1024;
-  private static final int FILE_SIZE = 2 * BLOCK_SIZE;
+  protected static int BLOCK_SIZE = 1024;
+  protected static int FILE_SIZE = 2 * BLOCK_SIZE;
   private static final String FILE_TO_READ = "/fileToRead.dat";
   private static final String FILE_TO_WRITE = "/fileToWrite.dat";
   private static final String FILE_TO_APPEND = "/fileToAppend.dat";
-  private final byte[] rawData = new byte[FILE_SIZE];
 
   {
     ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  public static byte[] generateBytes(int fileSize){
     Random r = new Random();
+    byte[] rawData = new byte[fileSize];
     r.nextBytes(rawData);
+    return rawData;
   }
 
-  private void createFile(FileSystem fs, Path filename) throws IOException {
+  private void createFile(FileSystem fs, Path filename, byte[] expected) throws IOException {
     FSDataOutputStream out = fs.create(filename);
-    out.write(rawData);
+    out.write(expected);
     out.close();
   }
 
   // read a file using blockSeekTo()
-  private boolean checkFile1(FSDataInputStream in) {
-    byte[] toRead = new byte[FILE_SIZE];
+  private boolean checkFile1(FSDataInputStream in, byte[] expected) {
+    byte[] toRead = new byte[expected.length];
     int totalRead = 0;
     int nRead = 0;
     try {
@@ -101,27 +104,27 @@ public class TestBlockTokenWithDFS {
       return false;
     }
     assertEquals("Cannot read file.", toRead.length, totalRead);
-    return checkFile(toRead);
+    return checkFile(toRead, expected);
   }
 
   // read a file using fetchBlockByteRange()
-  private boolean checkFile2(FSDataInputStream in) {
-    byte[] toRead = new byte[FILE_SIZE];
+  private boolean checkFile2(FSDataInputStream in, byte[] expected) {
+    byte[] toRead = new byte[expected.length];
     try {
       assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0,
           toRead.length));
     } catch (IOException e) {
       return false;
     }
-    return checkFile(toRead);
+    return checkFile(toRead, expected);
   }
 
-  private boolean checkFile(byte[] fileToCheck) {
-    if (fileToCheck.length != rawData.length) {
+  private boolean checkFile(byte[] fileToCheck, byte[] expected) {
+    if (fileToCheck.length != expected.length) {
       return false;
     }
     for (int i = 0; i < fileToCheck.length; i++) {
-      if (fileToCheck[i] != rawData[i]) {
+      if (fileToCheck[i] != expected[i]) {
         return false;
       }
     }
@@ -137,7 +140,7 @@ public class TestBlockTokenWithDFS {
   }
 
   // try reading a block using a BlockReader directly
-  private static void tryRead(final Configuration conf, LocatedBlock lblock,
+  protected void tryRead(final Configuration conf, LocatedBlock lblock,
       boolean shouldSucceed) {
     InetSocketAddress targetAddr = null;
     IOException ioe = null;
@@ -148,7 +151,7 @@ public class TestBlockTokenWithDFS {
       targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
 
       blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
-          setFileName(BlockReaderFactory.getFileName(targetAddr, 
+          setFileName(BlockReaderFactory.getFileName(targetAddr,
                         "test-blockpoolid", block.getBlockId())).
           setBlock(block).
           setBlockToken(lblock.getBlockToken()).
@@ -205,7 +208,7 @@ public class TestBlockTokenWithDFS {
   }
 
   // get a conf for testing
-  private static Configuration getConf(int numDataNodes) {
+  protected Configuration getConf(int numDataNodes) {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@@ -241,16 +244,16 @@ public class TestBlockTokenWithDFS {
       SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
       Path fileToAppend = new Path(FILE_TO_APPEND);
       FileSystem fs = cluster.getFileSystem();
-
+      byte[] expected = generateBytes(FILE_SIZE);
       // write a one-byte file
       FSDataOutputStream stm = writeFile(fs, fileToAppend,
           (short) numDataNodes, BLOCK_SIZE);
-      stm.write(rawData, 0, 1);
+      stm.write(expected, 0, 1);
       stm.close();
       // open the file again for append
       stm = fs.append(fileToAppend);
-      int mid = rawData.length - 1;
-      stm.write(rawData, 1, mid - 1);
+      int mid = expected.length - 1;
+      stm.write(expected, 1, mid - 1);
       stm.hflush();
 
       /*
@@ -267,11 +270,11 @@ public class TestBlockTokenWithDFS {
       // remove a datanode to force re-establishing pipeline
       cluster.stopDataNode(0);
       // append the rest of the file
-      stm.write(rawData, mid, rawData.length - mid);
+      stm.write(expected, mid, expected.length - mid);
       stm.close();
       // check if append is successful
       FSDataInputStream in5 = fs.open(fileToAppend);
-      assertTrue(checkFile1(in5));
+      assertTrue(checkFile1(in5, expected));
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -303,11 +306,12 @@ public class TestBlockTokenWithDFS {
       Path fileToWrite = new Path(FILE_TO_WRITE);
       FileSystem fs = cluster.getFileSystem();
 
+      byte[] expected = generateBytes(FILE_SIZE);
       FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes,
           BLOCK_SIZE);
       // write a partial block
-      int mid = rawData.length - 1;
-      stm.write(rawData, 0, mid);
+      int mid = expected.length - 1;
+      stm.write(expected, 0, mid);
       stm.hflush();
 
       /*
@@ -324,11 +328,11 @@ public class TestBlockTokenWithDFS {
       // remove a datanode to force re-establishing pipeline
       cluster.stopDataNode(0);
       // write the rest of the file
-      stm.write(rawData, mid, rawData.length - mid);
+      stm.write(expected, mid, expected.length - mid);
       stm.close();
       // check if write is successful
       FSDataInputStream in4 = fs.open(fileToWrite);
-      assertTrue(checkFile1(in4));
+      assertTrue(checkFile1(in4, expected));
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -346,125 +350,137 @@ public class TestBlockTokenWithDFS {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
       cluster.waitActive();
       assertEquals(numDataNodes, cluster.getDataNodes().size());
+      doTestRead(conf, cluster, false);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 
-      final NameNode nn = cluster.getNameNode();
-      final NamenodeProtocols nnProto = nn.getRpcServer();
-      final BlockManager bm = nn.getNamesystem().getBlockManager();
-      final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
+  protected void doTestRead(Configuration conf, MiniDFSCluster cluster,
+      boolean isStriped) throws Exception {
+    final int numDataNodes = cluster.getDataNodes().size();
+    final NameNode nn = cluster.getNameNode();
+    final NamenodeProtocols nnProto = nn.getRpcServer();
+    final BlockManager bm = nn.getNamesystem().getBlockManager();
+    final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
 
-      // set a short token lifetime (1 second) initially
-      SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
+    // set a short token lifetime (1 second) initially
+    SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
 
-      Path fileToRead = new Path(FILE_TO_READ);
-      FileSystem fs = cluster.getFileSystem();
-      createFile(fs, fileToRead);
+    Path fileToRead = new Path(FILE_TO_READ);
+    FileSystem fs = cluster.getFileSystem();
+    byte[] expected = generateBytes(FILE_SIZE);
+    createFile(fs, fileToRead, expected);
 
       /*
        * setup for testing expiration handling of cached tokens
        */
 
-      // read using blockSeekTo(). Acquired tokens are cached in in1
-      FSDataInputStream in1 = fs.open(fileToRead);
-      assertTrue(checkFile1(in1));
-      // read using blockSeekTo(). Acquired tokens are cached in in2
-      FSDataInputStream in2 = fs.open(fileToRead);
-      assertTrue(checkFile1(in2));
-      // read using fetchBlockByteRange(). Acquired tokens are cached in in3
-      FSDataInputStream in3 = fs.open(fileToRead);
-      assertTrue(checkFile2(in3));
+    // read using blockSeekTo(). Acquired tokens are cached in in1
+    FSDataInputStream in1 = fs.open(fileToRead);
+    assertTrue(checkFile1(in1,expected));
+    // read using blockSeekTo(). Acquired tokens are cached in in2
+    FSDataInputStream in2 = fs.open(fileToRead);
+    assertTrue(checkFile1(in2,expected));
+    // read using fetchBlockByteRange(). Acquired tokens are cached in in3
+    FSDataInputStream in3 = fs.open(fileToRead);
+    assertTrue(checkFile2(in3,expected));
 
       /*
        * testing READ interface on DN using a BlockReader
        */
-      DFSClient client = null;
-      try {
-        client = new DFSClient(new InetSocketAddress("localhost",
+    DFSClient client = null;
+    try {
+      client = new DFSClient(new InetSocketAddress("localhost",
           cluster.getNameNodePort()), conf);
-      } finally {
-        if (client != null) client.close();
-      }
-      List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
-          FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
-      LocatedBlock lblock = locatedBlocks.get(0); // first block
-      Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
-      // verify token is not expired
-      assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken));
-      // read with valid token, should succeed
-      tryRead(conf, lblock, true);
+    } finally {
+      if (client != null) client.close();
+    }
+    List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
+        FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
+    LocatedBlock lblock = locatedBlocks.get(0); // first block
+    // verify token is not expired
+    assertFalse(isBlockTokenExpired(lblock));
+    // read with valid token, should succeed
+    tryRead(conf, lblock, true);
 
       /*
        * wait till myToken and all cached tokens in in1, in2 and in3 expire
        */
 
-      while (!SecurityTestUtil.isBlockTokenExpired(myToken)) {
-        try {
-          Thread.sleep(10);
-        } catch (InterruptedException ignored) {
-        }
+    while (!isBlockTokenExpired(lblock)) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ignored) {
       }
+    }
 
       /*
        * continue testing READ interface on DN using a BlockReader
        */
 
-      // verify token is expired
-      assertTrue(SecurityTestUtil.isBlockTokenExpired(myToken));
-      // read should fail
-      tryRead(conf, lblock, false);
-      // use a valid new token
-      lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
-              EnumSet.of(BlockTokenIdentifier.AccessMode.READ)));
-      // read should succeed
-      tryRead(conf, lblock, true);
-      // use a token with wrong blockID
-      ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock()
-          .getBlockPoolId(), lblock.getBlock().getBlockId() + 1);
-      lblock.setBlockToken(sm.generateToken(wrongBlock,
-          EnumSet.of(BlockTokenIdentifier.AccessMode.READ)));
-      // read should fail
-      tryRead(conf, lblock, false);
-      // use a token with wrong access modes
-      lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
-          EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE,
-                     BlockTokenIdentifier.AccessMode.COPY,
-                     BlockTokenIdentifier.AccessMode.REPLACE)));
-      // read should fail
-      tryRead(conf, lblock, false);
-
-      // set a long token lifetime for future tokens
-      SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L);
+    // verify token is expired
+    assertTrue(isBlockTokenExpired(lblock));
+    // read should fail
+    tryRead(conf, lblock, false);
+    // use a valid new token
+    bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ);
+    // read should succeed
+    tryRead(conf, lblock, true);
+    // use a token with wrong blockID
+    long rightId = lblock.getBlock().getBlockId();
+    long wrongId = rightId + 1;
+    lblock.getBlock().setBlockId(wrongId);
+    bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ);
+    lblock.getBlock().setBlockId(rightId);
+    // read should fail
+    tryRead(conf, lblock, false);
+    // use a token with wrong access modes
+    bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.WRITE);
+    // read should fail
+    tryRead(conf, lblock, false);
+
+    // set a long token lifetime for future tokens
+    SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L);
 
       /*
        * testing that when cached tokens are expired, DFSClient will re-fetch
        * tokens transparently for READ.
        */
 
-      // confirm all tokens cached in in1 are expired by now
-      List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
-      for (LocatedBlock blk : lblocks) {
-        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify blockSeekTo() is able to re-fetch token transparently
-      in1.seek(0);
-      assertTrue(checkFile1(in1));
-
-      // confirm all tokens cached in in2 are expired by now
-      List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
-      for (LocatedBlock blk : lblocks2) {
-        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify blockSeekTo() is able to re-fetch token transparently (testing
-      // via another interface method)
+    // confirm all tokens cached in in1 are expired by now
+    List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
+    for (LocatedBlock blk : lblocks) {
+      assertTrue(isBlockTokenExpired(blk));
+    }
+    // verify blockSeekTo() is able to re-fetch token transparently
+    in1.seek(0);
+    assertTrue(checkFile1(in1, expected));
+
+    // confirm all tokens cached in in2 are expired by now
+    List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
+    for (LocatedBlock blk : lblocks2) {
+      assertTrue(isBlockTokenExpired(blk));
+    }
+    // verify blockSeekTo() is able to re-fetch token transparently (testing
+    // via another interface method)
+    if (isStriped) {
+      // striped block doesn't support seekToNewSource
+      in2.seek(0);
+    } else {
       assertTrue(in2.seekToNewSource(0));
-      assertTrue(checkFile1(in2));
+    }
+    assertTrue(checkFile1(in2,expected));
 
-      // confirm all tokens cached in in3 are expired by now
-      List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
-      for (LocatedBlock blk : lblocks3) {
-        assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify fetchBlockByteRange() is able to re-fetch token transparently
-      assertTrue(checkFile2(in3));
+    // confirm all tokens cached in in3 are expired by now
+    List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
+    for (LocatedBlock blk : lblocks3) {
+      assertTrue(isBlockTokenExpired(blk));
+    }
+    // verify fetchBlockByteRange() is able to re-fetch token transparently
+    assertTrue(checkFile2(in3,expected));
 
       /*
        * testing that after datanodes are restarted on the same ports, cached
@@ -473,37 +489,42 @@ public class TestBlockTokenWithDFS {
        * new tokens can be fetched from namenode).
        */
 
-      // restart datanodes on the same ports that they currently use
-      assertTrue(cluster.restartDataNodes(true));
-      cluster.waitActive();
-      assertEquals(numDataNodes, cluster.getDataNodes().size());
-      cluster.shutdownNameNode(0);
+    // restart datanodes on the same ports that they currently use
+    assertTrue(cluster.restartDataNodes(true));
+    cluster.waitActive();
+    assertEquals(numDataNodes, cluster.getDataNodes().size());
+    cluster.shutdownNameNode(0);
 
-      // confirm tokens cached in in1 are still valid
-      lblocks = DFSTestUtil.getAllBlocks(in1);
-      for (LocatedBlock blk : lblocks) {
-        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify blockSeekTo() still works (forced to use cached tokens)
-      in1.seek(0);
-      assertTrue(checkFile1(in1));
-
-      // confirm tokens cached in in2 are still valid
-      lblocks2 = DFSTestUtil.getAllBlocks(in2);
-      for (LocatedBlock blk : lblocks2) {
-        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify blockSeekTo() still works (forced to use cached tokens)
+    // confirm tokens cached in in1 are still valid
+    lblocks = DFSTestUtil.getAllBlocks(in1);
+    for (LocatedBlock blk : lblocks) {
+      assertFalse(isBlockTokenExpired(blk));
+    }
+    // verify blockSeekTo() still works (forced to use cached tokens)
+    in1.seek(0);
+    assertTrue(checkFile1(in1,expected));
+
+    // confirm tokens cached in in2 are still valid
+    lblocks2 = DFSTestUtil.getAllBlocks(in2);
+    for (LocatedBlock blk : lblocks2) {
+      assertFalse(isBlockTokenExpired(blk));
+    }
+
+    // verify blockSeekTo() still works (forced to use cached tokens)
+    if (isStriped) {
+      in2.seek(0);
+    } else {
       in2.seekToNewSource(0);
-      assertTrue(checkFile1(in2));
+    }
+    assertTrue(checkFile1(in2,expected));
 
-      // confirm tokens cached in in3 are still valid
-      lblocks3 = DFSTestUtil.getAllBlocks(in3);
-      for (LocatedBlock blk : lblocks3) {
-        assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
-      }
-      // verify fetchBlockByteRange() still works (forced to use cached tokens)
-      assertTrue(checkFile2(in3));
+    // confirm tokens cached in in3 are still valid
+    lblocks3 = DFSTestUtil.getAllBlocks(in3);
+    for (LocatedBlock blk : lblocks3) {
+      assertFalse(isBlockTokenExpired(blk));
+    }
+    // verify fetchBlockByteRange() still works (forced to use cached tokens)
+    assertTrue(checkFile2(in3,expected));
 
       /*
        * testing that when namenode is restarted, cached tokens should still
@@ -512,18 +533,23 @@ public class TestBlockTokenWithDFS {
        * setup for this test depends on the previous test.
        */
 
-      // restart the namenode and then shut it down for test
-      cluster.restartNameNode(0);
-      cluster.shutdownNameNode(0);
+    // restart the namenode and then shut it down for test
+    cluster.restartNameNode(0);
+    cluster.shutdownNameNode(0);
 
-      // verify blockSeekTo() still works (forced to use cached tokens)
-      in1.seek(0);
-      assertTrue(checkFile1(in1));
-      // verify again blockSeekTo() still works (forced to use cached tokens)
+    // verify blockSeekTo() still works (forced to use cached tokens)
+    in1.seek(0);
+    assertTrue(checkFile1(in1,expected));
+    // verify again blockSeekTo() still works (forced to use cached tokens)
+    if (isStriped) {
+      in2.seek(0);
+    } else {
       in2.seekToNewSource(0);
-      assertTrue(checkFile1(in2));
-      // verify fetchBlockByteRange() still works (forced to use cached tokens)
-      assertTrue(checkFile2(in3));
+    }
+    assertTrue(checkFile1(in2,expected));
+
+    // verify fetchBlockByteRange() still works (forced to use cached tokens)
+    assertTrue(checkFile2(in3,expected));
 
       /*
        * testing that after both namenode and datanodes got restarted (namenode
@@ -532,58 +558,60 @@ public class TestBlockTokenWithDFS {
        * setup of this test depends on the previous test.
        */
 
-      // restore the cluster and restart the datanodes for test
-      cluster.restartNameNode(0);
-      assertTrue(cluster.restartDataNodes(true));
-      cluster.waitActive();
-      assertEquals(numDataNodes, cluster.getDataNodes().size());
-
-      // shutdown namenode so that DFSClient can't get new tokens from namenode
-      cluster.shutdownNameNode(0);
-
-      // verify blockSeekTo() fails (cached tokens become invalid)
-      in1.seek(0);
-      assertFalse(checkFile1(in1));
-      // verify fetchBlockByteRange() fails (cached tokens become invalid)
-      assertFalse(checkFile2(in3));
-
-      // restart the namenode to allow DFSClient to re-fetch tokens
-      cluster.restartNameNode(0);
-      // verify blockSeekTo() works again (by transparently re-fetching
-      // tokens from namenode)
-      in1.seek(0);
-      assertTrue(checkFile1(in1));
+    // restore the cluster and restart the datanodes for test
+    cluster.restartNameNode(0);
+    assertTrue(cluster.restartDataNodes(true));
+    cluster.waitActive();
+    assertEquals(numDataNodes, cluster.getDataNodes().size());
+
+    // shutdown namenode so that DFSClient can't get new tokens from namenode
+    cluster.shutdownNameNode(0);
+
+    // verify blockSeekTo() fails (cached tokens become invalid)
+    in1.seek(0);
+    assertFalse(checkFile1(in1,expected));
+    // verify fetchBlockByteRange() fails (cached tokens become invalid)
+    assertFalse(checkFile2(in3,expected));
+
+    // restart the namenode to allow DFSClient to re-fetch tokens
+    cluster.restartNameNode(0);
+    // verify blockSeekTo() works again (by transparently re-fetching
+    // tokens from namenode)
+    in1.seek(0);
+    assertTrue(checkFile1(in1,expected));
+    if (isStriped) {
+      in2.seek(0);
+    } else {
       in2.seekToNewSource(0);
-      assertTrue(checkFile1(in2));
-      // verify fetchBlockByteRange() works again (by transparently
-      // re-fetching tokens from namenode)
-      assertTrue(checkFile2(in3));
+    }
+    assertTrue(checkFile1(in2,expected));
+    // verify fetchBlockByteRange() works again (by transparently
+    // re-fetching tokens from namenode)
+    assertTrue(checkFile2(in3,expected));
 
       /*
        * testing that when datanodes are restarted on different ports, DFSClient
        * is able to re-fetch tokens transparently to connect to them
        */
 
-      // restart datanodes on newly assigned ports
-      assertTrue(cluster.restartDataNodes(false));
-      cluster.waitActive();
-      assertEquals(numDataNodes, cluster.getDataNodes().size());
-      // verify blockSeekTo() is able to re-fetch token transparently
-      in1.seek(0);
-      assertTrue(checkFile1(in1));
-      // verify blockSeekTo() is able to re-fetch token transparently
+    // restart datanodes on newly assigned ports
+    assertTrue(cluster.restartDataNodes(false));
+    cluster.waitActive();
+    assertEquals(numDataNodes, cluster.getDataNodes().size());
+    // verify blockSeekTo() is able to re-fetch token transparently
+    in1.seek(0);
+    assertTrue(checkFile1(in1,expected));
+    // verify blockSeekTo() is able to re-fetch token transparently
+    if (isStriped) {
+      in2.seek(0);
+    } else {
       in2.seekToNewSource(0);
-      assertTrue(checkFile1(in2));
-      // verify fetchBlockByteRange() is able to re-fetch token transparently
-      assertTrue(checkFile2(in3));
-
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
     }
-  }
+    assertTrue(checkFile1(in2,expected));
+    // verify fetchBlockByteRange() is able to re-fetch token transparently
+    assertTrue(checkFile2(in3,expected));
 
+  }
   /**
    * Integration testing of access token, involving NN, DN, and Balancer
    */
@@ -593,4 +621,8 @@ public class TestBlockTokenWithDFS {
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     new TestBalancer().integrationTest(conf);
   }
+
+  protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException {
+    return SecurityTestUtil.isBlockTokenExpired(lb.getBlockToken());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
new file mode 100644
index 0000000..f985f54
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS {
+
+  private final static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  private final static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final static int stripesPerBlock = 4;
+  private final static int numDNs = dataBlocks + parityBlocks + 2;
+  private static MiniDFSCluster cluster;
+  private static Configuration conf;
+
+  {
+    BLOCK_SIZE = cellSize * stripesPerBlock;
+    FILE_SIZE =  BLOCK_SIZE * dataBlocks * 3;
+  }
+
+  private Configuration getConf() {
+    Configuration conf = super.getConf(numDNs);
+    conf.setInt("io.bytes.per.checksum", cellSize);
+    return conf;
+  }
+
+  @Test
+  @Override
+  public void testRead() throws Exception {
+    conf = getConf();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient()
+        .createErasureCodingZone("/", null, cellSize);
+    try {
+      cluster.waitActive();
+      doTestRead(conf, cluster, true);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * tested at {@link org.apache.hadoop.hdfs.TestDFSStripedOutputStreamWithFailure#testBlockTokenExpired()}
+   */
+  @Test
+  @Override
+  public void testWrite(){
+  }
+
+  @Test
+  @Override
+  public void testAppend() throws Exception {
+    //TODO: support Append for striped file
+  }
+
+  @Test
+  @Override
+  public void testEnd2End() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    new TestBalancer().integrationTestWithStripedFile(conf);
+  }
+
+  @Override
+  protected void tryRead(final Configuration conf, LocatedBlock lblock,
+                         boolean shouldSucceed) {
+    LocatedStripedBlock lsb = (LocatedStripedBlock) lblock;
+    LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup
+        (lsb, cellSize, dataBlocks, parityBlocks);
+    for (LocatedBlock internalBlock : internalBlocks) {
+      super.tryRead(conf, internalBlock, shouldSucceed);
+    }
+  }
+
+  @Override
+  protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException {
+    LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+    LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup
+        (lsb, cellSize, dataBlocks, parityBlocks);
+    for (LocatedBlock internalBlock : internalBlocks) {
+      if(super.isBlockTokenExpired(internalBlock)){
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
index 6fc30ba..c1218a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index cea6865..b11b48a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -185,9 +185,12 @@ public class TestNameNodePrunesMissingStorages {
       String datanodeUuid;
       // Find the first storage which this block is in.
       try {
+        BlockInfo storedBlock =
+            cluster.getNamesystem().getBlockManager().
+                getStoredBlock(block.getLocalBlock());
         Iterator<DatanodeStorageInfo> storageInfoIter =
             cluster.getNamesystem().getBlockManager().
-                getStorages(block.getLocalBlock()).iterator();
+                blocksMap.getStorages(storedBlock).iterator();
         assertTrue(storageInfoIter.hasNext());
         DatanodeStorageInfo info = storageInfoIter.next();
         storageIdToRemove = info.getStorageID();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
index 1c3f075..c33667d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
@@ -100,7 +100,7 @@ public class TestNodeCount {
       DatanodeDescriptor nonExcessDN = null;
       for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
         final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
-        Collection<Block> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
+        Collection<BlockInfo> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
         if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
           nonExcessDN = dn;
           break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
index 2d7bb44..83b3aa0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -42,7 +41,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 public class TestOverReplicatedBlocks {
@@ -185,7 +183,7 @@ public class TestOverReplicatedBlocks {
       // All replicas for deletion should be scheduled on lastDN.
       // And should not actually be deleted, because lastDN does not heartbeat.
       namesystem.readLock();
-      Collection<Block> dnBlocks = 
+      Collection<BlockInfo> dnBlocks =
         namesystem.getBlockManager().excessReplicateMap.get(lastDNid);
       assertEquals("Replicas on node " + lastDNid + " should have been deleted",
           SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 6553185..f5af898 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -1208,8 +1208,17 @@ public class TestReplicationPolicy {
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
 
-    BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
-    BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
+    long blkID1 = ThreadLocalRandom.current().nextLong();
+    if (blkID1 < 0) {
+      blkID1 *= -1;
+    }
+    long blkID2 = ThreadLocalRandom.current().nextLong();
+    if (blkID2 < 0) {
+      blkID2 *= -1;
+    }
+
+    BlockInfo block1 = genBlockInfo(blkID1);
+    BlockInfo block2 = genBlockInfo(blkID2);
 
     // Adding QUEUE_UNDER_REPLICATED block
     underReplicatedBlocks.add(block1, 0, 1, 1);
@@ -1224,7 +1233,7 @@ public class TestReplicationPolicy {
     chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
     assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
 
-    final BlockInfo info = new BlockInfoContiguous(block1, (short) 1);
+    final BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
     final BlockCollection mbc = mock(BlockCollection.class);
     when(mbc.getLastBlock()).thenReturn(info);
     when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1);
@@ -1247,12 +1256,12 @@ public class TestReplicationPolicy {
     when(storage.getState()).thenReturn(DatanodeStorage.State.NORMAL);
     when(storage.getDatanodeDescriptor()).thenReturn(dn);
     when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
-    when(storage.addBlock(any(BlockInfo.class))).thenReturn
+    when(storage.addBlock(any(BlockInfoContiguous.class))).thenReturn
         (DatanodeStorageInfo.AddBlockResult.ADDED);
-    ucBlock.addStorage(storage);
+    ucBlock.addStorage(storage, ucBlock);
 
-    when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
-    .thenReturn(ucBlock);
+    BlockInfo lastBlk = mbc.getLastBlock();
+    when(mbc.getLastBlock()).thenReturn(lastBlk, ucBlock);
 
     bm.convertLastBlockToUnderConstruction(mbc, 0L);
 
@@ -1287,7 +1296,7 @@ public class TestReplicationPolicy {
     chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
     assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
 
-    bm.setReplication((short)0, (short)1, "", block1);
+    bm.setReplication((short)0, (short)1, "", (BlockInfoContiguous) block1);
 
     // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
     // from QUEUE_VERY_UNDER_REPLICATED.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSequentialBlockGroupId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSequentialBlockGroupId.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSequentialBlockGroupId.java
new file mode 100644
index 0000000..2f2356f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSequentialBlockGroupId.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_BLOCKS_IN_GROUP;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests the sequential blockGroup ID generation mechanism and blockGroup ID
+ * collision handling.
+ */
+public class TestSequentialBlockGroupId {
+  private static final Log LOG = LogFactory
+      .getLog("TestSequentialBlockGroupId");
+
+  private final short REPLICATION = 1;
+  private final long SEED = 0;
+  private final int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  private final int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+
+  private final int stripesPerBlock = 2;
+  private final int blockSize = cellSize * stripesPerBlock;
+  private final int numDNs = dataBlocks + parityBlocks + 2;
+  private final int blockGrpCount = 4;
+  private final int fileLen = blockSize * dataBlocks * blockGrpCount;
+
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+  private SequentialBlockGroupIdGenerator blockGrpIdGenerator;
+  private Path eczone = new Path("/eczone");
+
+  @Before
+  public void setup() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.waitActive();
+
+    fs = cluster.getFileSystem();
+    blockGrpIdGenerator = cluster.getNamesystem().getBlockIdManager()
+        .getBlockGroupIdGenerator();
+    fs.mkdirs(eczone);
+    cluster.getFileSystem().getClient()
+        .createErasureCodingZone("/eczone", null, cellSize);
+  }
+
+  @After
+  public void teardown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test that blockGroup IDs are generating unique value.
+   */
+  @Test(timeout = 60000)
+  public void testBlockGroupIdGeneration() throws IOException {
+    long blockGroupIdInitialValue = blockGrpIdGenerator.getCurrentValue();
+
+    // Create a file that is 4 blocks long.
+    Path path = new Path(eczone, "testBlockGrpIdGeneration.dat");
+    DFSTestUtil.createFile(fs, path, cellSize, fileLen, blockSize, REPLICATION,
+        SEED);
+    List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs, path);
+    assertThat("Wrong BlockGrps", blocks.size(), is(blockGrpCount));
+
+    // initialising the block group generator for verifying the block id
+    blockGrpIdGenerator.setCurrentValue(blockGroupIdInitialValue);
+    // Ensure that the block IDs are generating unique value.
+    for (int i = 0; i < blocks.size(); ++i) {
+      blockGrpIdGenerator
+          .skipTo((blockGrpIdGenerator.getCurrentValue() & ~BLOCK_GROUP_INDEX_MASK)
+              + MAX_BLOCKS_IN_GROUP);
+      long nextBlockExpectedId = blockGrpIdGenerator.getCurrentValue();
+      long nextBlockGrpId = blocks.get(i).getBlock().getBlockId();
+      LOG.info("BlockGrp" + i + " id is " + nextBlockGrpId);
+      assertThat("BlockGrpId mismatches!", nextBlockGrpId,
+          is(nextBlockExpectedId));
+    }
+  }
+
+  /**
+   * Test that collisions in the blockGroup ID space are handled gracefully.
+   */
+  @Test(timeout = 60000)
+  public void testTriggerBlockGroupIdCollision() throws IOException {
+    long blockGroupIdInitialValue = blockGrpIdGenerator.getCurrentValue();
+
+    // Create a file with a few blocks to rev up the global block ID
+    // counter.
+    Path path1 = new Path(eczone, "testBlockGrpIdCollisionDetection_file1.dat");
+    DFSTestUtil.createFile(fs, path1, cellSize, fileLen, blockSize,
+        REPLICATION, SEED);
+    List<LocatedBlock> blocks1 = DFSTestUtil.getAllBlocks(fs, path1);
+    assertThat("Wrong BlockGrps", blocks1.size(), is(blockGrpCount));
+
+    // Rewind the block ID counter in the name system object. This will result
+    // in block ID collisions when we try to allocate new blocks.
+    blockGrpIdGenerator.setCurrentValue(blockGroupIdInitialValue);
+
+    // Trigger collisions by creating a new file.
+    Path path2 = new Path(eczone, "testBlockGrpIdCollisionDetection_file2.dat");
+    DFSTestUtil.createFile(fs, path2, cellSize, fileLen, blockSize,
+        REPLICATION, SEED);
+    List<LocatedBlock> blocks2 = DFSTestUtil.getAllBlocks(fs, path2);
+    assertThat("Wrong BlockGrps", blocks2.size(), is(blockGrpCount));
+
+    // Make sure that file1 and file2 block IDs are different
+    for (LocatedBlock locBlock1 : blocks1) {
+      long blockId1 = locBlock1.getBlock().getBlockId();
+      for (LocatedBlock locBlock2 : blocks2) {
+        long blockId2 = locBlock2.getBlock().getBlockId();
+        assertThat("BlockGrpId mismatches!", blockId1, is(not(blockId2)));
+      }
+    }
+  }
+
+  /**
+   * Test that collisions in the blockGroup ID when the id is occupied by legacy
+   * block.
+   */
+  @Test(timeout = 60000)
+  public void testTriggerBlockGroupIdCollisionWithLegacyBlockId()
+      throws Exception {
+    long blockGroupIdInitialValue = blockGrpIdGenerator.getCurrentValue();
+    blockGrpIdGenerator
+        .skipTo((blockGrpIdGenerator.getCurrentValue() & ~BLOCK_GROUP_INDEX_MASK)
+            + MAX_BLOCKS_IN_GROUP);
+    final long curBlockGroupIdValue = blockGrpIdGenerator.getCurrentValue();
+
+    // Creates contiguous block with negative blockId so that it would trigger
+    // collision during blockGroup Id generation
+    FSNamesystem fsn = cluster.getNamesystem();
+    // Replace SequentialBlockIdGenerator with a spy
+    SequentialBlockIdGenerator blockIdGenerator = spy(fsn.getBlockIdManager()
+        .getBlockIdGenerator());
+    Whitebox.setInternalState(fsn.getBlockIdManager(), "blockIdGenerator",
+        blockIdGenerator);
+    SequentialBlockIdGenerator spySequentialBlockIdGenerator = new SequentialBlockIdGenerator(
+        null) {
+      @Override
+      public long nextValue() {
+        return curBlockGroupIdValue;
+      }
+    };
+    final Answer<Object> delegator = new GenericTestUtils.DelegateAnswer(
+        spySequentialBlockIdGenerator);
+    doAnswer(delegator).when(blockIdGenerator).nextValue();
+
+    Path path1 = new Path("/testCollisionWithLegacyBlock_file1.dat");
+    DFSTestUtil.createFile(fs, path1, 1024, REPLICATION, SEED);
+
+    List<LocatedBlock> contiguousBlocks = DFSTestUtil.getAllBlocks(fs, path1);
+    assertThat(contiguousBlocks.size(), is(1));
+    Assert.assertEquals("Unexpected BlockId!", curBlockGroupIdValue,
+        contiguousBlocks.get(0).getBlock().getBlockId());
+
+    // Reset back to the initial value to trigger collision
+    blockGrpIdGenerator.setCurrentValue(blockGroupIdInitialValue);
+    // Trigger collisions by creating a new file.
+    Path path2 = new Path(eczone, "testCollisionWithLegacyBlock_file2.dat");
+    DFSTestUtil.createFile(fs, path2, cellSize, fileLen, blockSize,
+        REPLICATION, SEED);
+    List<LocatedBlock> blocks2 = DFSTestUtil.getAllBlocks(fs, path2);
+    assertThat("Wrong BlockGrps", blocks2.size(), is(blockGrpCount));
+
+    // Make sure that file1 and file2 block IDs are different
+    for (LocatedBlock locBlock1 : contiguousBlocks) {
+      long blockId1 = locBlock1.getBlock().getBlockId();
+      for (LocatedBlock locBlock2 : blocks2) {
+        long blockId2 = locBlock2.getBlock().getBlockId();
+        assertThat("BlockGrpId mismatches!", blockId1, is(not(blockId2)));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
index de36e07..0f419ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -28,10 +31,21 @@ import static org.junit.Assert.fail;
 
 public class TestUnderReplicatedBlockQueues {
 
+  private final ECSchema ecSchema =
+      ErasureCodingSchemaManager.getSystemDefaultSchema();
+  private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+
   private BlockInfo genBlockInfo(long id) {
     return new BlockInfoContiguous(new Block(id), (short) 3);
   }
 
+  private BlockInfo genStripedBlockInfo(long id, long numBytes) {
+    BlockInfoStriped sblk =  new BlockInfoStriped(new Block(id), ecSchema,
+        CELLSIZE);
+    sblk.setNumBytes(numBytes);
+    return sblk;
+  }
+
   /**
    * Test that adding blocks with different replication counts puts them
    * into different queues
@@ -85,6 +99,54 @@ public class TestUnderReplicatedBlockQueues {
     assertEquals(2, queues.getCorruptReplOneBlockSize());
   }
 
+  @Test
+  public void testStripedBlockPriorities() throws Throwable {
+    int dataBlkNum = ecSchema.getNumDataUnits();
+    int parityBlkNUm = ecSchema.getNumParityUnits();
+    doTestStripedBlockPriorities(1, parityBlkNUm);
+    doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm);
+  }
+
+  private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum)
+      throws Throwable {
+    int groupSize = dataBlkNum + parityBlkNum;
+    long numBytes = CELLSIZE * dataBlkNum;
+    UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
+
+    // add a striped block which been left NUM_DATA_BLOCKS internal blocks
+    BlockInfo block1 = genStripedBlockInfo(-100, numBytes);
+    assertAdded(queues, block1, dataBlkNum, 0, groupSize);
+    assertEquals(1, queues.getUnderReplicatedBlockCount());
+    assertEquals(1, queues.size());
+    assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
+
+    // add a striped block which been left NUM_DATA_BLOCKS+1 internal blocks
+    BlockInfo block2 = genStripedBlockInfo(-200, numBytes);
+    assertAdded(queues, block2, dataBlkNum + 1, 0, groupSize);
+    assertEquals(2, queues.getUnderReplicatedBlockCount());
+    assertEquals(2, queues.size());
+    assertInLevel(queues, block2,
+        UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
+
+    // add a striped block which been left NUM_DATA_BLOCKS+2 internal blocks
+    BlockInfo block3 = genStripedBlockInfo(-300, numBytes);
+    assertAdded(queues, block3, dataBlkNum + 2, 0, groupSize);
+    assertEquals(3, queues.getUnderReplicatedBlockCount());
+    assertEquals(3, queues.size());
+    assertInLevel(queues, block3,
+        UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
+
+    // add a corrupted block
+    BlockInfo block_corrupt = genStripedBlockInfo(-400, numBytes);
+    assertEquals(0, queues.getCorruptBlockSize());
+    assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
+    assertEquals(4, queues.size());
+    assertEquals(3, queues.getUnderReplicatedBlockCount());
+    assertEquals(1, queues.getCorruptBlockSize());
+    assertInLevel(queues, block_corrupt,
+        UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+  }
+
   private void assertAdded(UnderReplicatedBlocks queues,
                            BlockInfo block,
                            int curReplicas,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 6e5f07c..82b77c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -100,7 +100,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   public static byte simulatedByte(Block b, long offsetInBlk) {
     byte firstByte = (byte) (b.getBlockId() & BYTE_MASK);
-    return (byte) ((firstByte + offsetInBlk) & BYTE_MASK);
+    return (byte) ((firstByte + offsetInBlk % 29) & BYTE_MASK);
   }
   
   public static final String CONFIG_PROPERTY_CAPACITY =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
index 989e216..d8c651f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
@@ -227,15 +227,6 @@ public class TestIncrementalBrVariations {
     return new Block(10000000L, 100L, 1048576L);
   }
 
-  private static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
-      Block block, DatanodeStorage storage) {
-    ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
-    receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, BlockStatus.RECEIVED_BLOCK, null);
-    StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
-    reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
-    return reports;
-  }
-
   /**
    * Verify that the NameNode can learn about new storages from incremental
    * block reports.
@@ -251,8 +242,9 @@ public class TestIncrementalBrVariations {
     // Generate a report for a fake block on a fake storage.
     final String newStorageUuid = UUID.randomUUID().toString();
     final DatanodeStorage newStorage = new DatanodeStorage(newStorageUuid);
-    StorageReceivedDeletedBlocks[] reports = makeReportForReceivedBlock(
-        getDummyBlock(), newStorage);
+    StorageReceivedDeletedBlocks[] reports = DFSTestUtil.
+        makeReportForReceivedBlock(getDummyBlock(), BlockStatus.RECEIVED_BLOCK,
+            newStorage);
 
     // Send the report to the NN.
     cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index d3d814c..14503b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -34,11 +35,17 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
 import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
 import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -99,7 +106,7 @@ public class TestMover {
       final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
       final List<MLocation> locations = MLocation.toLocations(lb);
       final MLocation ml = locations.get(0);
-      final DBlock db = mover.newDBlock(lb.getBlock().getLocalBlock(), locations);
+      final DBlock db = mover.newDBlock(lb, locations, null);
 
       final List<StorageType> storageTypes = new ArrayList<StorageType>(
           Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT));
@@ -409,4 +416,120 @@ public class TestMover {
       cluster.shutdown();
     }
   }
+
+  int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final static int stripesPerBlock = 4;
+  static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock;
+
+  static void initConfWithStripe(Configuration conf) {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
+    Dispatcher.setBlockMoveWaitTime(3000L);
+  }
+
+  @Test(timeout = 300000)
+  public void testMoverWithStripedFile() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConfWithStripe(conf);
+
+    // start 10 datanodes
+    int numOfDatanodes =10;
+    int storagesPerDatanode=2;
+    long capacity = 10 * DEFAULT_STRIPE_BLOCK_SIZE;
+    long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
+    for (int i = 0; i < numOfDatanodes; i++) {
+      for(int j=0;j<storagesPerDatanode;j++){
+        capacities[i][j]=capacity;
+      }
+    }
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numOfDatanodes)
+        .storagesPerDatanode(storagesPerDatanode)
+        .storageTypes(new StorageType[][]{
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE}})
+        .storageCapacities(capacities)
+        .build();
+
+    try {
+      cluster.waitActive();
+
+      // set "/bar" directory with HOT storage policy.
+      ClientProtocol client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+      String barDir = "/bar";
+      client.mkdirs(barDir, new FsPermission((short) 777), true);
+      client.setStoragePolicy(barDir,
+          HdfsConstants.HOT_STORAGE_POLICY_NAME);
+      // set "/bar" directory with EC zone.
+      client.createErasureCodingZone(barDir, null, 0);
+
+      // write file to barDir
+      final String fooFile = "/bar/foo";
+      long fileLen = 20 * DEFAULT_STRIPE_BLOCK_SIZE ;
+      DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
+          fileLen,(short) 3, 0);
+
+      // verify storage types and locations
+      LocatedBlocks locatedBlocks =
+          client.getBlockLocations(fooFile, 0, fileLen);
+      for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
+        for( StorageType type : lb.getStorageTypes()){
+          Assert.assertEquals(StorageType.DISK, type);
+        }
+      }
+      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+          dataBlocks + parityBlocks);
+
+      // start 5 more datanodes
+      numOfDatanodes +=5;
+      capacities = new long[5][storagesPerDatanode];
+      for (int i = 0; i < 5; i++) {
+        for(int j=0;j<storagesPerDatanode;j++){
+          capacities[i][j]=capacity;
+        }
+      }
+      cluster.startDataNodes(conf, 5,
+          new StorageType[][]{
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+          true, null, null, null,capacities, null, false, false, false, null);
+      cluster.triggerHeartbeats();
+
+      // move file to ARCHIVE
+      client.setStoragePolicy(barDir, "COLD");
+      // run Mover
+      int rc = ToolRunner.run(conf, new Mover.Cli(),
+          new String[] { "-p", barDir });
+      Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc);
+
+      // verify storage types and locations
+      locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen);
+      for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
+        for( StorageType type : lb.getStorageTypes()){
+          Assert.assertEquals(StorageType.ARCHIVE, type);
+        }
+      }
+      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+          dataBlocks + parityBlocks);
+
+    }finally{
+      cluster.shutdown();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index b314584..a2cb434 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -71,8 +71,13 @@ public class NameNodeAdapter {
   public static HdfsFileStatus getFileInfo(NameNode namenode, String src,
       boolean resolveLink) throws AccessControlException, UnresolvedLinkException,
         StandbyException, IOException {
-    return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem()
-            .getFSDirectory(), src, resolveLink);
+    namenode.getNamesystem().readLock();
+    try {
+      return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem()
+          .getFSDirectory(), src, resolveLink);
+    } finally {
+      namenode.getNamesystem().readUnlock();
+    }
   }
   
   public static boolean mkdirs(NameNode namenode, String src,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
new file mode 100644
index 0000000..337911d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestAddOverReplicatedStripedBlocks {
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private final Path dirPath = new Path("/striped");
+  private Path filePath = new Path(dirPath, "file");
+  private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
+  private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final short GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
+  private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final int NUM_STRIPE_PER_BLOCK = 4;
+  private final int BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
+  private final int numDNs = GROUP_SIZE + 3;
+
+  @Before
+  public void setup() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    // disable block recovery
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+    SimulatedFSDataset.setFactory(conf);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    fs.mkdirs(dirPath);
+    fs.getClient().createErasureCodingZone(dirPath.toString(), null, CELLSIZE);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testProcessOverReplicatedStripedBlock() throws Exception {
+    // create a file which has exact one block group to the first GROUP_SIZE DNs
+    long fileLen = DATA_BLK_NUM * BLOCK_SIZE;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, 1,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+    long gs = bg.getBlock().getGenerationStamp();
+    String bpid = bg.getBlock().getBlockPoolId();
+    long groupId = bg.getBlock().getBlockId();
+    Block blk = new Block(groupId, BLOCK_SIZE, gs);
+    for (int i = 0; i < GROUP_SIZE; i++) {
+      blk.setBlockId(groupId + i);
+      cluster.injectBlocks(i, Arrays.asList(blk), bpid);
+    }
+    cluster.triggerBlockReports();
+
+    // let a internal block be over replicated with 2 redundant blocks.
+    blk.setBlockId(groupId + 2);
+    cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid);
+    cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid);
+    // let a internal block be over replicated with 1 redundant block.
+    blk.setBlockId(groupId + 6);
+    cluster.injectBlocks(numDNs - 1, Arrays.asList(blk), bpid);
+
+    // update blocksMap
+    cluster.triggerBlockReports();
+    // add to invalidates
+    cluster.triggerHeartbeats();
+    // datanode delete block
+    cluster.triggerHeartbeats();
+    // update blocksMap
+    cluster.triggerBlockReports();
+
+    // verify that all internal blocks exists
+    lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+  }
+
+  @Test
+  public void testProcessOverReplicatedSBSmallerThanFullBlocks()
+      throws Exception {
+    // Create a EC file which doesn't fill full internal blocks.
+    int fileLen = CELLSIZE * (DATA_BLK_NUM - 1);
+    byte[] content = new byte[fileLen];
+    DFSTestUtil.writeFile(fs, filePath, new String(content));
+    LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+    long gs = bg.getBlock().getGenerationStamp();
+    String bpid = bg.getBlock().getBlockPoolId();
+    long groupId = bg.getBlock().getBlockId();
+    Block blk = new Block(groupId, BLOCK_SIZE, gs);
+    cluster.triggerBlockReports();
+    List<DatanodeInfo> infos = Arrays.asList(bg.getLocations());
+
+    // let a internal block be over replicated with 2 redundant blocks.
+    // Therefor number of internal blocks is over GROUP_SIZE. (5 data blocks +
+    // 3 parity blocks  + 2 redundant blocks > GROUP_SIZE)
+    blk.setBlockId(groupId + 2);
+    List<DataNode> dataNodeList = cluster.getDataNodes();
+    for (int i = 0; i < numDNs; i++) {
+      if (!infos.contains(dataNodeList.get(i).getDatanodeId())) {
+        cluster.injectBlocks(i, Arrays.asList(blk), bpid);
+        System.out.println("XXX: inject block into datanode " + i);
+      }
+    }
+
+    // update blocksMap
+    cluster.triggerBlockReports();
+    // add to invalidates
+    cluster.triggerHeartbeats();
+    // datanode delete block
+    cluster.triggerHeartbeats();
+    // update blocksMap
+    cluster.triggerBlockReports();
+
+    // verify that all internal blocks exists
+    lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+  }
+
+  @Test
+  public void testProcessOverReplicatedAndCorruptStripedBlock()
+      throws Exception {
+    long fileLen = DATA_BLK_NUM * BLOCK_SIZE;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, 1,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+    long gs = bg.getBlock().getGenerationStamp();
+    String bpid = bg.getBlock().getBlockPoolId();
+    long groupId = bg.getBlock().getBlockId();
+    Block blk = new Block(groupId, BLOCK_SIZE, gs);
+    BlockInfoStriped blockInfo = new BlockInfoStriped(blk,
+        ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE);
+    for (int i = 0; i < GROUP_SIZE; i++) {
+      blk.setBlockId(groupId + i);
+      cluster.injectBlocks(i, Arrays.asList(blk), bpid);
+    }
+    cluster.triggerBlockReports();
+
+    // let a internal block be corrupt
+    BlockManager bm = cluster.getNamesystem().getBlockManager();
+    List<DatanodeInfo> infos = Arrays.asList(bg.getLocations());
+    List<String> storages = Arrays.asList(bg.getStorageIDs());
+    cluster.getNamesystem().writeLock();
+    try {
+      bm.findAndMarkBlockAsCorrupt(lbs.getLastLocatedBlock().getBlock(),
+          infos.get(0), storages.get(0), "TEST");
+    } finally {
+      cluster.getNamesystem().writeUnlock();
+    }
+    assertEquals(1, bm.countNodes(blockInfo).corruptReplicas());
+
+    // let a internal block be over replicated with 2 redundant block.
+    blk.setBlockId(groupId + 2);
+    cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid);
+    cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid);
+
+    // update blocksMap
+    cluster.triggerBlockReports();
+    // add to invalidates
+    cluster.triggerHeartbeats();
+    // datanode delete block
+    cluster.triggerHeartbeats();
+    // update blocksMap
+    cluster.triggerBlockReports();
+
+    // verify that all internal blocks exists
+    lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+  }
+
+  @Test
+  public void testProcessOverReplicatedAndMissingStripedBlock()
+      throws Exception {
+    long fileLen = CELLSIZE * DATA_BLK_NUM;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, 1,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+    long gs = bg.getBlock().getGenerationStamp();
+    String bpid = bg.getBlock().getBlockPoolId();
+    long groupId = bg.getBlock().getBlockId();
+    Block blk = new Block(groupId, BLOCK_SIZE, gs);
+    // only inject GROUP_SIZE - 1 blocks, so there is one block missing
+    for (int i = 0; i < GROUP_SIZE - 1; i++) {
+      blk.setBlockId(groupId + i);
+      cluster.injectBlocks(i, Arrays.asList(blk), bpid);
+    }
+    cluster.triggerBlockReports();
+
+    // let a internal block be over replicated with 2 redundant blocks.
+    // Therefor number of internal blocks is over GROUP_SIZE. (5 data blocks +
+    // 3 parity blocks  + 2 redundant blocks > GROUP_SIZE)
+    blk.setBlockId(groupId + 2);
+    cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid);
+    cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid);
+
+    // update blocksMap
+    cluster.triggerBlockReports();
+    // add to invalidates
+    cluster.triggerHeartbeats();
+    // datanode delete block
+    cluster.triggerHeartbeats();
+    // update blocksMap
+    cluster.triggerBlockReports();
+
+    // Since one block is missing, when over-replicated blocks got deleted,
+    // we are left GROUP_SIZE - 1 blocks.
+    lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+  }
+
+}


Mime
View raw message