hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject hadoop git commit: HDFS-8798. Erasure Coding: fix DFSStripedInputStream/DFSStripedOutputStream re-fetch token when expired. Contributed by Walter Su.
Date Fri, 24 Jul 2015 20:53:00 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 c2c26e6ea -> 95b499a36


HDFS-8798. Erasure Coding: fix DFSStripedInputStream/DFSStripedOutputStream re-fetch token
when expired. Contributed by Walter Su.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/95b499a3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/95b499a3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/95b499a3

Branch: refs/heads/HDFS-7285
Commit: 95b499a3671daae9018ae005c9384fb65aa37320
Parents: c2c26e6
Author: Jing Zhao <jing9@apache.org>
Authored: Fri Jul 24 13:52:50 2015 -0700
Committer: Jing Zhao <jing9@apache.org>
Committed: Fri Jul 24 13:52:50 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../hadoop/hdfs/DFSStripedInputStream.java      |  93 ++++++-----
 .../apache/hadoop/hdfs/StripedDataStreamer.java |   5 +-
 .../hadoop/hdfs/util/StripedBlockUtil.java      |   4 +-
 .../hadoop/hdfs/TestDFSStripedInputStream.java  |   1 -
 .../TestDFSStripedOutputStreamWithFailure.java  | 154 ++++++++++---------
 .../hdfs/server/balancer/TestBalancer.java      |   9 ++
 .../TestBlockTokenWithDFSStriped.java           |  44 +++---
 8 files changed, 159 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/95b499a3/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 9741585..2f7a88a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -373,3 +373,6 @@
 
     HDFS-8813. Erasure Coding: Client no need to decode missing parity blocks.
     (Walter Su via jing9)
+
+    HDFS-8798. Erasure Coding: fix DFSStripedInputStream/DFSStripedOutputStream
+    re-fetch token when expired. (Walter Su via jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95b499a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 4f3a8ed..1f64d4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -44,7 +43,6 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -207,44 +205,6 @@ public class DFSStripedInputStream extends DFSInputStream {
   }
 
   /**
-   * @throws IOException only when failing to refetch block token, which happens
-   * when this client cannot get located block information from NameNode. This
-   * method returns null instead of throwing exception when failing to connect
-   * to the DataNode.
-   */
-  private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock,
-      long offsetInBlock, long length, InetSocketAddress targetAddr,
-      StorageType storageType, DatanodeInfo datanode, long offsetInFile,
-      ReaderRetryPolicy retry) throws IOException {
-    // only need to get a new access token or a new encryption key once
-    while (true) {
-      try {
-        return getBlockReader(targetBlock, offsetInBlock, length, targetAddr,
-            storageType, datanode);
-      } catch (IOException e) {
-        if (e instanceof InvalidEncryptionKeyException &&
-            retry.shouldRefetchEncryptionKey()) {
-          DFSClient.LOG.info("Will fetch a new encryption key and retry, "
-              + "encryption key was invalid when connecting to " + targetAddr
-              + " : " + e);
-          dfsClient.clearDataEncryptionKey();
-          retry.refetchEncryptionKey();
-        } else if (retry.shouldRefetchToken() &&
-            tokenRefetchNeeded(e, targetAddr)) {
-          fetchBlockAt(offsetInFile);
-          retry.refetchToken();
-        } else {
-          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
-              + ", add to deadNodes and continue.", e);
-          // Put chosen node into dead list, continue
-          addToDeadNodes(datanode);
-          return null;
-        }
-      }
-    }
-  }
-
-  /**
    * Extend the super method with the logic of switching between cells.
    * When reaching the end of a cell, proceed to the next cell and read it
    * with the next blockReader.
@@ -293,13 +253,13 @@ public class DFSStripedInputStream extends DFSInputStream {
     final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
     final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
         - (stripeIndex * stripeLen), stripeLen);
-    curStripeRange = new StripeRange(offsetInBlockGroup,
+    StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
         stripeLimit - stripeBufOffset);
 
     LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
     AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize,
         blockGroup, offsetInBlockGroup,
-        offsetInBlockGroup + curStripeRange.length - 1, curStripeBuf);
+        offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
     // read the whole stripe
@@ -311,6 +271,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
     curStripeBuf.position(stripeBufOffset);
     curStripeBuf.limit(stripeLimit);
+    curStripeRange = stripeRange;
   }
 
   private Callable<Void> readCells(final BlockReader reader,
@@ -423,7 +384,6 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
     Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
         new ConcurrentHashMap<>();
-    failures = 0;
     if (pos < getFileLength()) {
       try {
         if (pos > blockEnd) {
@@ -623,13 +583,46 @@ public class DFSStripedInputStream extends DFSInputStream {
 
     boolean createBlockReader(LocatedBlock block, int chunkIndex)
         throws IOException {
-      DNAddrPair dnInfo = getBestNodeDNAddrPair(block, null);
-      if (dnInfo != null) {
-        BlockReader reader = getBlockReaderWithRetry(block,
-            alignedStripe.getOffsetInBlock(),
-            block.getBlockSize() - alignedStripe.getOffsetInBlock(),
-            dnInfo.addr, dnInfo.storageType, dnInfo.info,
-            block.getStartOffset(), new ReaderRetryPolicy());
+      BlockReader reader = null;
+      final ReaderRetryPolicy retry = new ReaderRetryPolicy();
+      DNAddrPair dnInfo = new DNAddrPair(null, null, null);
+
+      while(true) {
+        try {
+          // the cached block location might have been re-fetched, so always
+          // get it from cache.
+          block = refreshLocatedBlock(block);
+          targetBlocks[chunkIndex] = block;
+
+          // internal block has one location, just rule out the deadNodes
+          dnInfo = getBestNodeDNAddrPair(block, null);
+          if (dnInfo == null) {
+            break;
+          }
+          reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
+              block.getBlockSize() - alignedStripe.getOffsetInBlock(),
+              dnInfo.addr, dnInfo.storageType, dnInfo.info);
+        } catch (IOException e) {
+          if (e instanceof InvalidEncryptionKeyException &&
+              retry.shouldRefetchEncryptionKey()) {
+            DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+                + "encryption key was invalid when connecting to " + dnInfo.addr
+                + " : " + e);
+            dfsClient.clearDataEncryptionKey();
+            retry.refetchEncryptionKey();
+          } else if (retry.shouldRefetchToken() &&
+              tokenRefetchNeeded(e, dnInfo.addr)) {
+            fetchBlockAt(block.getStartOffset());
+            retry.refetchToken();
+          } else {
+            //TODO: handles connection issues
+            DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
+                "block" + block.getBlock(), e);
+            // re-fetch the block in case the block has been moved
+            fetchBlockAt(block.getStartOffset());
+            addToDeadNodes(dnInfo.info);
+          }
+        }
         if (reader != null) {
           readerInfos[chunkIndex] = new BlockReaderInfo(reader, block,
               dnInfo.info, alignedStripe.getOffsetInBlock());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95b499a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index a177796..2d51dc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -195,12 +195,15 @@ public class StripedDataStreamer extends DataStreamer {
         final ExtendedBlock bg = coordinator.getBlockGroup();
         final LocatedBlock updated = callUpdateBlockForPipeline(bg);
         final long newGS = updated.getBlock().getGenerationStamp();
+        final LocatedBlock[] updatedBlks = StripedBlockUtil
+            .parseStripedBlockGroup((LocatedStripedBlock) updated,
+                BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
         for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
           final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
           if (bi != null) {
             final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
                 null, null, null, -1, updated.isCorrupt(), null);
-            lb.setBlockToken(updated.getBlockToken());
+            lb.setBlockToken(updatedBlks[i].getBlockToken());
             newBlocks.offer(i, lb);
           } else {
             final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95b499a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index a3ee1e8..4dc94a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -119,8 +119,8 @@ public class StripedBlockUtil {
           bg.getStartOffset(), bg.isCorrupt(), null);
     }
     Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens();
-    if (idxInBlockGroup < blockTokens.length) {
-      locatedBlock.setBlockToken(blockTokens[idxInBlockGroup]);
+    if (idxInReturnedLocs < blockTokens.length) {
+      locatedBlock.setBlockToken(blockTokens[idxInReturnedLocs]);
     }
     return locatedBlock;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95b499a3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index c520d2c..baf6106 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 import org.junit.After;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95b499a3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index 8944cde..54fcdf8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -25,22 +25,27 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.base.Preconditions;
@@ -63,17 +68,13 @@ public class TestDFSStripedOutputStreamWithFailure {
   private static final int FLUSH_POS
       = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
 
-  private final HdfsConfiguration conf = new HdfsConfiguration();
   private MiniDFSCluster cluster;
   private DistributedFileSystem dfs;
   private final Path dir = new Path("/"
       + TestDFSStripedOutputStreamWithFailure.class.getSimpleName());
 
-
-  @Before
-  public void setup() throws IOException {
+  private void setup(Configuration conf) throws IOException {
     final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.waitActive();
     dfs = cluster.getFileSystem();
@@ -81,8 +82,7 @@ public class TestDFSStripedOutputStreamWithFailure {
     dfs.createErasureCodingZone(dir, null, 0);
   }
 
-  @After
-  public void tearDown() {
+  private void tearDown() {
     if (cluster != null) {
       cluster.shutdown();
     }
@@ -92,89 +92,76 @@ public class TestDFSStripedOutputStreamWithFailure {
     return (byte)pos;
   }
 
-  @Test(timeout=120000)
-  public void testDatanodeFailure0() {
-    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
-    final int dn = 0;
-    runTest("file" + dn, length, dn);
-  }
-
-  @Test(timeout=120000)
-  public void testDatanodeFailure1() {
-    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
-    final int dn = 1;
-    runTest("file" + dn, length, dn);
-  }
-
-  @Test(timeout=120000)
-  public void testDatanodeFailure2() {
-    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
-    final int dn = 2;
-    runTest("file" + dn, length, dn);
-  }
-
-  @Test(timeout=120000)
-  public void testDatanodeFailure3() {
-    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
-    final int dn = 3;
-    runTest("file" + dn, length, dn);
-  }
-
-  @Test(timeout=120000)
-  public void testDatanodeFailure4() {
-    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
-    final int dn = 4;
-    runTest("file" + dn, length, dn);
-  }
-
-  @Test(timeout=120000)
-  public void testDatanodeFailure5() {
-    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
-    final int dn = 5;
-    runTest("file" + dn, length, dn);
-  }
-
-  @Test(timeout=120000)
-  public void testDatanodeFailure6() {
-    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
-    final int dn = 6;
-    runTest("file" + dn, length, dn);
+  private void initConf(Configuration conf){
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
   }
 
-  @Test(timeout=120000)
-  public void testDatanodeFailure7() {
-    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
-    final int dn = 7;
-    runTest("file" + dn, length, dn);
+  private void initConfWithBlockToken(Configuration conf) {
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.setInt("ipc.client.connect.max.retries", 0);
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
   }
 
-  @Test(timeout=120000)
-  public void testDatanodeFailure8() {
+  @Test(timeout=240000)
+  public void testDatanodeFailure() throws Exception {
     final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
-    final int dn = 8;
-    runTest("file" + dn, length, dn);
+    HdfsConfiguration conf = new HdfsConfiguration();
+    initConf(conf);
+    for (int dn = 0; dn < 9; dn++) {
+      try {
+        setup(conf);
+        cluster.startDataNodes(conf, 1, true, null, null);
+        cluster.waitActive();
+        runTest(new Path(dir, "file" + dn), length, length / 2, dn, false);
+      } catch (Exception e) {
+        LOG.error("failed, dn=" + dn + ", length=" + length);
+        throw e;
+      } finally {
+        tearDown();
+      }
+    }
   }
 
-  private void runTest(final String src, final int length, final int dnIndex) {
-    try {
-      cluster.startDataNodes(conf, 1, true, null, null);
-      cluster.waitActive();
-
-      runTest(new Path(dir, src), length, length/2, dnIndex);
-    } catch(Exception e) {
-      LOG.info("FAILED", e);
-      Assert.fail(StringUtils.stringifyException(e));
+  @Test(timeout=240000)
+  public void testBlockTokenExpired() throws Exception {
+    final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE);
+    HdfsConfiguration conf = new HdfsConfiguration();
+    initConf(conf);
+    initConfWithBlockToken(conf);
+    for (int dn = 0; dn < 9; dn += 2) {
+      try {
+        setup(conf);
+        cluster.startDataNodes(conf, 1, true, null, null);
+        cluster.waitActive();
+        runTest(new Path(dir, "file" + dn), length, length / 2, dn, true);
+      } catch (Exception e) {
+        LOG.error("failed, dn=" + dn + ", length=" + length);
+        throw e;
+      } finally {
+        tearDown();
+      }
     }
   }
 
   private void runTest(final Path p, final int length, final int killPos,
-      final int dnIndex) throws Exception {
+      final int dnIndex, final boolean tokenExpire) throws Exception {
     LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos
         + ", dnIndex=" + dnIndex);
     Preconditions.checkArgument(killPos < length);
     Preconditions.checkArgument(killPos > FLUSH_POS);
     final String fullPath = p.toString();
 
+    final NameNode nn = cluster.getNameNode();
+    final BlockManager bm = nn.getNamesystem().getBlockManager();
+    final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
+
+    if (tokenExpire) {
+      // set a short token lifetime (1 second)
+      SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
+    }
+
     final AtomicInteger pos = new AtomicInteger();
     final FSDataOutputStream out = dfs.create(p);
     final DFSStripedOutputStream stripedOut
@@ -189,6 +176,11 @@ public class TestDFSStripedOutputStreamWithFailure {
         Assert.assertTrue(oldGS != -1);
         Assert.assertEquals(oldGS, gs);
 
+        if (tokenExpire) {
+          DFSTestUtil.flushInternal(stripedOut);
+          waitTokenExpires(out);
+        }
+
         killDatanode(cluster, stripedOut, dnIndex, pos);
         killed = true;
       }
@@ -348,4 +340,14 @@ public class TestDFSStripedOutputStreamWithFailure {
           killedDnIndex - dataBlockBytes.length);
     }
   }
+
+  private void waitTokenExpires(FSDataOutputStream out) throws IOException {
+    Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
+    while (!SecurityTestUtil.isBlockTokenExpired(token)) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ignored) {
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95b499a3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 759eb45..8239e5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -1469,10 +1469,19 @@ public class TestBalancer {
     }
   }
 
+  public void integrationTestWithStripedFile(Configuration conf) throws Exception {
+    initConfWithStripe(conf);
+    doTestBalancerWithStripedFile(conf);
+  }
+
   @Test(timeout = 100000)
   public void testBalancerWithStripedFile() throws Exception {
     Configuration conf = new Configuration();
     initConfWithStripe(conf);
+    doTestBalancerWithStripedFile(conf);
+  }
+
+  private void doTestBalancerWithStripedFile(Configuration conf) throws Exception {
     int numOfDatanodes = dataBlocks + parityBlocks + 2;
     int numOfRacks = dataBlocks;
     long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95b499a3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
index e212917..f985f54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java
@@ -20,13 +20,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -46,22 +44,6 @@ public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS
{
     FILE_SIZE =  BLOCK_SIZE * dataBlocks * 3;
   }
 
-  @Before
-  public void setup() throws IOException {
-    conf = getConf();
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
-    cluster.getFileSystem().getClient()
-        .createErasureCodingZone("/", null, cellSize);
-    cluster.waitActive();
-  }
-
-  @After
-  public void tearDown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
   private Configuration getConf() {
     Configuration conf = super.getConf(numDNs);
     conf.setInt("io.bytes.per.checksum", cellSize);
@@ -71,14 +53,26 @@ public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS
{
   @Test
   @Override
   public void testRead() throws Exception {
-    //TODO: DFSStripedInputStream handles token expiration
-//    doTestRead(conf, cluster, true);
+    conf = getConf();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient()
+        .createErasureCodingZone("/", null, cellSize);
+    try {
+      cluster.waitActive();
+      doTestRead(conf, cluster, true);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
   }
 
+  /**
+   * tested at {@link org.apache.hadoop.hdfs.TestDFSStripedOutputStreamWithFailure#testBlockTokenExpired()}
+   */
   @Test
   @Override
-  public void testWrite() throws Exception {
-    //TODO: DFSStripedOutputStream handles token expiration
+  public void testWrite(){
   }
 
   @Test
@@ -90,7 +84,9 @@ public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS
{
   @Test
   @Override
   public void testEnd2End() throws Exception {
-    //TODO: DFSStripedOutputStream handles token expiration
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    new TestBalancer().integrationTestWithStripedFile(conf);
   }
 
   @Override


Mime
View raw message