hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From weic...@apache.org
Subject hadoop git commit: HDFS-10609. Uncaught InvalidEncryptionKeyException during pipeline recovery may abort downstream applications. Contributed by Wei-Chiu Chuang.
Date Wed, 05 Oct 2016 20:30:01 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 599146d10 -> 039c3a735


HDFS-10609. Uncaught InvalidEncryptionKeyException during pipeline recovery may abort downstream
applications. Contributed by Wei-Chiu Chuang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/039c3a73
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/039c3a73
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/039c3a73

Branch: refs/heads/branch-2.7
Commit: 039c3a735192ac05209af89c0cc74a27c118a21f
Parents: 599146d
Author: Wei-Chiu Chuang <weichiu@apache.org>
Authored: Wed Oct 5 13:29:20 2016 -0700
Committer: Wei-Chiu Chuang <weichiu@apache.org>
Committed: Wed Oct 5 13:29:20 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   5 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 147 +++-
 .../block/BlockPoolTokenSecretManager.java      |   3 +-
 .../token/block/BlockTokenSecretManager.java    |   6 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |   5 +
 .../hadoop/hdfs/TestEncryptedTransfer.java      | 719 ++++++++-----------
 6 files changed, 421 insertions(+), 464 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 58d93cd..1a6a96b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -2188,6 +2188,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  @VisibleForTesting
+  public DataEncryptionKey getEncryptionKey() {
+    return encryptionKey;
+  }
+
   /**
    * Get the checksum of the whole file of a range of the file. Note that the
    * range always starts from the beginning of the file.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index f8c8592..ef8aa5a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -225,6 +225,89 @@ public class DFSOutputStream extends FSOutputSummer
   // if them are received, the DataStreamer closes the current block.
   //
   class DataStreamer extends Daemon {
+    private class RefetchEncryptionKeyPolicy {
+      private int fetchEncryptionKeyTimes = 0;
+      private InvalidEncryptionKeyException lastException;
+      private final DatanodeInfo src;
+
+      RefetchEncryptionKeyPolicy(DatanodeInfo src) {
+        this.src = src;
+      }
+      boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException {
+        if (fetchEncryptionKeyTimes >= 2) {
+          // hit the same exception twice connecting to the node, so
+          // throw the exception and exclude the node.
+          throw lastException;
+        }
+        // Don't exclude this node just yet.
+        // Try again with a new encryption key.
+        DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+            + "encryption key was invalid when connecting to "
+            + this.src + ": ", lastException);
+        // The encryption key used is invalid.
+        dfsClient.clearDataEncryptionKey();
+        return true;
+      }
+
+      /**
+       * Record a connection exception.
+       * @param e
+       * @throws InvalidEncryptionKeyException
+       */
+      void recordFailure(final InvalidEncryptionKeyException e)
+          throws InvalidEncryptionKeyException {
+        fetchEncryptionKeyTimes++;
+        lastException = e;
+      }
+    }
+
+    private class StreamerStreams implements java.io.Closeable {
+      private Socket sock = null;
+      private DataOutputStream out = null;
+      private DataInputStream in = null;
+
+      StreamerStreams(final DatanodeInfo src,
+          final long writeTimeout, final long readTimeout,
+          final Token<BlockTokenIdentifier> blockToken)
+          throws IOException {
+        sock = createSocketForPipeline(src, 2, dfsClient);
+
+        OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+        InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
+        IOStreamPair saslStreams = dfsClient.saslClient
+            .socketSend(sock, unbufOut, unbufIn, dfsClient, blockToken, src);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+            HdfsConstants.SMALL_BUFFER_SIZE));
+        in = new DataInputStream(unbufIn);
+      }
+
+      void sendTransferBlock(final DatanodeInfo[] targets,
+          final StorageType[] targetStorageTypes,
+          final Token<BlockTokenIdentifier> blockToken) throws IOException {
+        //send the TRANSFER_BLOCK request
+        new Sender(out)
+            .transferBlock(block, blockToken, dfsClient.clientName, targets,
+                targetStorageTypes);
+        out.flush();
+        //ack
+        BlockOpResponseProto transferResponse = BlockOpResponseProto
+            .parseFrom(PBHelper.vintPrefixed(in));
+        if (SUCCESS != transferResponse.getStatus()) {
+          throw new IOException("Failed to add a datanode. Response status: "
+              + transferResponse.getStatus());
+        }
+      }
+
+      @Override
+      public void close() throws IOException {
+        IOUtils.closeStream(in);
+        IOUtils.closeStream(out);
+        IOUtils.closeSocket(sock);
+      }
+    }
+
     private volatile boolean streamerClosed = false;
     private volatile ExtendedBlock block; // its length is number of bytes acked
     private Token<BlockTokenIdentifier> accessToken;
@@ -1010,48 +1093,38 @@ public class DFSOutputStream extends FSOutputSummer
          new IOException("Failed to add a node");
     }
 
+    private long computeTransferWriteTimeout() {
+      return dfsClient.getDatanodeWriteTimeout(2);
+    }
+    private long computeTransferReadTimeout() {
+      // transfer timeout multiplier based on the transfer size
+      // One per 200 packets = 12.8MB. Minimum is 2.
+      int multi = 2
+          + (int) (bytesSent / dfsClient.getConf().writePacketSize) / 200;
+      return dfsClient.getDatanodeReadTimeout(multi);
+    }
+
     private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
         final StorageType[] targetStorageTypes,
         final Token<BlockTokenIdentifier> blockToken) throws IOException {
       //transfer replica to the new datanode
-      Socket sock = null;
-      DataOutputStream out = null;
-      DataInputStream in = null;
-      try {
-        sock = createSocketForPipeline(src, 2, dfsClient);
-        final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-        
-        // transfer timeout multiplier based on the transfer size
-        // One per 200 packets = 12.8MB. Minimum is 2.
-        int multi = 2 + (int)(bytesSent/dfsClient.getConf().writePacketSize)/200;
-        final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
-
-        OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
-        InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
-        IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
-          unbufOut, unbufIn, dfsClient, blockToken, src);
-        unbufOut = saslStreams.out;
-        unbufIn = saslStreams.in;
-        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-            HdfsConstants.SMALL_BUFFER_SIZE));
-        in = new DataInputStream(unbufIn);
-
-        //send the TRANSFER_BLOCK request
-        new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
-            targets, targetStorageTypes);
-        out.flush();
-
-        //ack
-        BlockOpResponseProto response =
-          BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
-        if (SUCCESS != response.getStatus()) {
-          throw new IOException("Failed to add a datanode");
+      RefetchEncryptionKeyPolicy policy = new RefetchEncryptionKeyPolicy(src);
+      do {
+        StreamerStreams streams = null;
+        try {
+          final long writeTimeout = computeTransferWriteTimeout();
+          final long readTimeout = computeTransferReadTimeout();
+
+          streams = new StreamerStreams(src, writeTimeout, readTimeout,
+              blockToken);
+          streams.sendTransferBlock(targets, targetStorageTypes, blockToken);
+          return;
+        } catch (InvalidEncryptionKeyException e) {
+          policy.recordFailure(e);
+        } finally {
+          IOUtils.closeStream(streams);
         }
-      } finally {
-        IOUtils.closeStream(in);
-        IOUtils.closeStream(out);
-        IOUtils.closeSocket(sock);
-      }
+      } while (policy.continueRetryingOrThrow());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
index 0df7067..7e3c877 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
@@ -49,7 +49,8 @@ public class BlockPoolTokenSecretManager extends
     map.put(bpid, secretMgr);
   }
 
-  synchronized BlockTokenSecretManager get(String bpid) {
+  @VisibleForTesting
+  public synchronized BlockTokenSecretManager get(String bpid) {
     BlockTokenSecretManager secretMgr = map.get(bpid);
     if (secretMgr == null) {
       throw new IllegalArgumentException("Block pool " + bpid

http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index a3685ca..4d4c4bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -439,6 +439,12 @@ public class BlockTokenSecretManager extends
   }
   
   @VisibleForTesting
+  public synchronized boolean hasKey(int keyId) {
+    BlockKey key = allKeys.get(keyId);
+    return key != null;
+  }
+
+  @VisibleForTesting
   public synchronized int getSerialNoForTesting() {
     return serialNo;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 3b27752..9ef23d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2487,6 +2487,11 @@ public class DataNode extends ReconfigurableBase
   }
 
 
+  @VisibleForTesting
+  public BlockPoolTokenSecretManager getBlockPoolTokenSecretManager() {
+    return blockPoolTokenSecretManager;
+  }
+
   public static void secureMain(String args[], SecureResources resources) {
     int errorCode = 0;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
index 30484d1..0ffa933 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java
@@ -21,32 +21,41 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
 
+import com.google.common.base.Supplier;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -72,8 +81,12 @@ public class TestEncryptedTransfer {
   
   private static final String PLAIN_TEXT = "this is very secret plain text";
   private static final Path TEST_PATH = new Path("/non-encrypted-file");
-  
-  private void setEncryptionConfigKeys(Configuration conf) {
+
+  private MiniDFSCluster cluster = null;
+  private Configuration conf = null;
+  private FileSystem fs = null;
+
+  private void setEncryptionConfigKeys() {
     conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     if (resolverClazz != null){
@@ -96,389 +109,271 @@ public class TestEncryptedTransfer {
     this.resolverClazz = resolverClazz;
   }
 
-  @Test
-  public void testEncryptedRead() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+  @Before
+  public void setup() throws IOException {
+    conf = new Configuration();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if (fs != null) {
       fs.close();
+    }
+    if (cluster != null) {
       cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      fs = getFileSystem(conf);
-      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(SaslDataTransferServer.class));
-      LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(DataTransferSaslUtil.class));
-      try {
-        assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-        assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      } finally {
-        logs.stopCapturing();
-        logs1.stopCapturing();
-      }
-      
-      fs.close();
-      
-      if (resolverClazz == null) {
-        // Test client and server negotiate cipher option
-        GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
-            "Server using cipher suite");
-        // Check the IOStreamPair
-        GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
-            "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
-      }
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
     }
   }
-  
-  @Test
-  public void testEncryptedReadWithRC4() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
-      fs.close();
-      cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      // It'll use 3DES by default, but we set it to rc4 here.
-      conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4");
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      fs = getFileSystem(conf);
-      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(SaslDataTransferServer.class));
-      LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(DataTransferSaslUtil.class));
-      try {
-        assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-        assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      } finally {
-        logs.stopCapturing();
-        logs1.stopCapturing();
-      }
 
-      fs.close();
+  private FileChecksum writeUnencryptedAndThenRestartEncryptedCluster()
+      throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).build();
 
-      if (resolverClazz == null) {
-        // Test client and server negotiate cipher option
-        GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
-            "Server using cipher suite");
-        // Check the IOStreamPair
-        GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
-            "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
-      }
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-  
-  @Test
-  public void testEncryptedReadWithAES() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
-          "AES/CTR/NoPadding");
-      cluster = new MiniDFSCluster.Builder(conf).build();
+    fs = getFileSystem(conf);
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+    FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
+    fs.close();
+    cluster.shutdown();
 
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
-      fs.close();
-      cluster.shutdown();
+    setEncryptionConfigKeys();
 
-      setEncryptionConfigKeys(conf);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .manageDataDfsDirs(false)
+        .manageNameDfsDirs(false)
+        .format(false)
+        .startupOption(StartupOption.REGULAR)
+        .build();
 
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
+    fs = getFileSystem(conf);
+    return checksum;
+  }
 
-      fs = getFileSystem(conf);
-      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(SaslDataTransferServer.class));
-      LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(DataTransferSaslUtil.class));
-      try {
-        assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-        assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      } finally {
-        logs.stopCapturing();
-        logs1.stopCapturing();
-      }
+  public void testEncryptedRead(String algorithm, String cipherSuite,
+      boolean matchLog, boolean readAfterRestart) throws IOException {
+    // set encryption algorithm and cipher suites, but don't enable transfer
+    // encryption yet.
+    conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, algorithm);
+    conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY,
+        cipherSuite);
 
-      fs.close();
+    FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
+
+    LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+        LogFactory.getLog(SaslDataTransferServer.class));
+    LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
+        LogFactory.getLog(DataTransferSaslUtil.class));
+    try {
+      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+    } finally {
+      logs.stopCapturing();
+      logs1.stopCapturing();
+    }
 
-      if (resolverClazz == null) {
+    if (resolverClazz == null) {
+      if (matchLog) {
         // Test client and server negotiate cipher option
-        GenericTestUtils.assertMatches(logs.getOutput(),
-            "Server using cipher suite");
+        GenericTestUtils
+            .assertMatches(logs.getOutput(), "Server using cipher suite");
         // Check the IOStreamPair
         GenericTestUtils.assertMatches(logs1.getOutput(),
             "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
-      }
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
+      } else {
+        // Test client and server negotiate cipher option
+        GenericTestUtils
+            .assertDoesNotMatch(logs.getOutput(), "Server using cipher suite");
+        // Check the IOStreamPair
+        GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
+            "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
       }
     }
-  }
 
-  @Test
-  public void testEncryptedReadAfterNameNodeRestart() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
-      fs.close();
-      cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      fs = getFileSystem(conf);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      fs.close();
-      
+    if (readAfterRestart) {
       cluster.restartNameNode();
       fs = getFileSystem(conf);
       assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
       assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
     }
   }
+
+  @Test
+  public void testEncryptedReadDefaultAlgorithmCipherSuite()
+      throws IOException {
+    testEncryptedRead("", "", false, false);
+  }
+
+  @Test
+  public void testEncryptedReadWithRC4() throws IOException {
+    testEncryptedRead("rc4", "", false, false);
+  }
+
+  @Test
+  public void testEncryptedReadWithAES() throws IOException {
+    testEncryptedRead("", "AES/CTR/NoPadding", true, false);
+  }
+
+  @Test
+  public void testEncryptedReadAfterNameNodeRestart() throws IOException {
+    testEncryptedRead("", "", false, true);
+  }
   
   @Test
   public void testClientThatDoesNotSupportEncryption() throws IOException {
-    MiniDFSCluster cluster = null;
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
+
+    writeUnencryptedAndThenRestartEncryptedCluster();
+
+    DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
+    DFSClient spyClient = Mockito.spy(client);
+    Mockito.doReturn(false).when(spyClient).shouldEncryptData();
+    DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
+
+    LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+        LogFactory.getLog(DataNode.class));
     try {
-      Configuration conf = new Configuration();
-      // Set short retry timeouts so this test runs faster
-      conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
       assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      fs.close();
-      cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      
-      fs = getFileSystem(conf);
-      DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
-      DFSClient spyClient = Mockito.spy(client);
-      Mockito.doReturn(false).when(spyClient).shouldEncryptData();
-      DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
-      
-      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(DataNode.class));
-      try {
-        assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-        if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){
-          fail("Should not have been able to read without encryption enabled.");
-        }
-      } catch (IOException ioe) {
-        GenericTestUtils.assertExceptionContains("Could not obtain block:",
-            ioe);
-      } finally {
-        logs.stopCapturing();
-      }
-      fs.close();
-      
-      if (resolverClazz == null) {
-        GenericTestUtils.assertMatches(logs.getOutput(),
-        "Failed to read expected encryption handshake from client at");
+      if (resolverClazz != null&&
+          !resolverClazz.endsWith("TestTrustedChannelResolver")){
+        fail("Should not have been able to read without encryption enabled.");
       }
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("Could not obtain block:",
+          ioe);
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      logs.stopCapturing();
+    }
+
+    if (resolverClazz == null) {
+      GenericTestUtils.assertMatches(logs.getOutput(),
+      "Failed to read expected encryption handshake from client at");
     }
   }
   
   @Test
   public void testLongLivedReadClientAfterRestart() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
-      fs.close();
-      cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      fs = getFileSystem(conf);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      
-      // Restart the NN and DN, after which the client's encryption key will no
-      // longer be valid.
-      cluster.restartNameNode();
-      assertTrue(cluster.restartDataNode(0));
-      
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
+    FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
+
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+    assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+
+    // Restart the NN and DN, after which the client's encryption key will no
+    // longer be valid.
+    cluster.restartNameNode();
+    assertTrue(cluster.restartDataNode(0));
+
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+    assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
   }
   
   @Test
   public void testLongLivedWriteClientAfterRestart() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      setEncryptionConfigKeys(conf);
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      // Restart the NN and DN, after which the client's encryption key will no
-      // longer be valid.
-      cluster.restartNameNode();
-      assertTrue(cluster.restartDataNodes());
-      cluster.waitActive();
-      
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
+    setEncryptionConfigKeys();
+    cluster = new MiniDFSCluster.Builder(conf).build();
+
+    fs = getFileSystem(conf);
+
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+    // Restart the NN and DN, after which the client's encryption key will no
+    // longer be valid.
+    cluster.restartNameNode();
+    assertTrue(cluster.restartDataNodes());
+    cluster.waitActive();
+
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
   }
   
   @Test
   public void testLongLivedClient() throws IOException, InterruptedException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
-      fs.close();
-      cluster.shutdown();
-      
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf)
-          .manageDataDfsDirs(false)
-          .manageNameDfsDirs(false)
-          .format(false)
-          .startupOption(StartupOption.REGULAR)
-          .build();
-      
-      BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
-          .getBlockTokenSecretManager();
-      btsm.setKeyUpdateIntervalForTesting(2 * 1000);
-      btsm.setTokenLifetime(2 * 1000);
-      btsm.clearAllKeysForTesting();
-      
-      fs = getFileSystem(conf);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      
-      // Sleep for 15 seconds, after which the encryption key will no longer be
-      // valid. It needs to be a few multiples of the block token lifetime,
-      // since several block tokens are valid at any given time (the current
-      // and the last two, by default.)
-      LOG.info("Sleeping so that encryption keys expire...");
-      Thread.sleep(15 * 1000);
-      LOG.info("Done sleeping.");
-      
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
-      
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+    FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster();
+
+    BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager();
+    btsm.setKeyUpdateIntervalForTesting(2 * 1000);
+    btsm.setTokenLifetime(2 * 1000);
+    btsm.clearAllKeysForTesting();
+
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+    assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+
+    // Sleep for 15 seconds, after which the encryption key will no longer be
+    // valid. It needs to be a few multiples of the block token lifetime,
+    // since several block tokens are valid at any given time (the current
+    // and the last two, by default.)
+    LOG.info("Sleeping so that encryption keys expire...");
+    Thread.sleep(15 * 1000);
+    LOG.info("Done sleeping.");
+
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+    assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
+  }
+
+  @Test
+  public void testLongLivedClientPipelineRecovery()
+      throws IOException, InterruptedException, TimeoutException {
+    if (resolverClazz != null) {
+      // TestTrustedChannelResolver does not use encryption keys.
+      return;
     }
+    // use 4 datanodes to make sure that after 1 data node is stopped,
+    // client only retries establishing pipeline with the 4th node.
+    int numDataNodes = 4;
+    // do not consider load factor when selecting a data node
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+        false);
+    setEncryptionConfigKeys();
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDataNodes)
+        .build();
+
+    fs = getFileSystem(conf);
+    DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
+    DFSClient spyClient = Mockito.spy(client);
+    DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
+    writeTestDataToFile(fs);
+
+    BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager();
+    // Reduce key update interval and token life for testing.
+    btsm.setKeyUpdateIntervalForTesting(2 * 1000);
+    btsm.setTokenLifetime(2 * 1000);
+    btsm.clearAllKeysForTesting();
+
+    // Wait until the encryption key becomes invalid.
+    LOG.info("Wait until encryption keys become invalid...");
+
+    final DataEncryptionKey encryptionKey = spyClient.getEncryptionKey();
+    List<DataNode> dataNodes = cluster.getDataNodes();
+    for (final DataNode dn: dataNodes) {
+      GenericTestUtils.waitFor(
+          new Supplier<Boolean>() {
+            @Override
+            public Boolean get() {
+              return !dn.getBlockPoolTokenSecretManager().
+                  get(encryptionKey.blockPoolId)
+                  .hasKey(encryptionKey.keyId);
+            }
+          }, 100, 30*1000
+      );
+    }
+    LOG.info("The encryption key is invalid on all nodes now.");
+    try(FSDataOutputStream out = fs.append(TEST_PATH)) {
+      DFSOutputStream dfstream = (DFSOutputStream) out.getWrappedStream();
+      // shut down the first datanode in the pipeline.
+      DatanodeInfo[] targets = dfstream.getPipeline();
+      cluster.stopDataNode(targets[0].getXferAddr());
+      // write data to induce pipeline recovery
+      out.write(PLAIN_TEXT.getBytes());
+      out.hflush();
+      assertFalse("The first datanode in the pipeline was not replaced.",
+          Arrays.asList(dfstream.getPipeline()).contains(targets[0]));
+    }
+    // verify that InvalidEncryptionKeyException is handled properly
+    Mockito.verify(spyClient, times(1)).clearDataEncryptionKey();
   }
   
   @Test
@@ -497,104 +392,76 @@ public class TestEncryptedTransfer {
   }
   
   private void testEncryptedWrite(int numDns) throws IOException {
-    MiniDFSCluster cluster = null;
+    setEncryptionConfigKeys();
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
+
+    fs = getFileSystem(conf);
+
+    LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+        LogFactory.getLog(SaslDataTransferServer.class));
+    LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
+        LogFactory.getLog(DataTransferSaslUtil.class));
     try {
-      Configuration conf = new Configuration();
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      
-      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(SaslDataTransferServer.class));
-      LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs(
-          LogFactory.getLog(DataTransferSaslUtil.class));
-      try {
-        writeTestDataToFile(fs);
-      } finally {
-        logs.stopCapturing();
-        logs1.stopCapturing();
-      }
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      fs.close();
-      
-      if (resolverClazz == null) {
-        // Test client and server negotiate cipher option
-        GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
-            "Server using cipher suite");
-        // Check the IOStreamPair
-        GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
-            "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
-      }
+      writeTestDataToFile(fs);
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      logs.stopCapturing();
+      logs1.stopCapturing();
+    }
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+    if (resolverClazz == null) {
+      // Test client and server negotiate cipher option
+      GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
+          "Server using cipher suite");
+      // Check the IOStreamPair
+      GenericTestUtils.assertDoesNotMatch(logs1.getOutput(),
+          "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream.");
     }
   }
   
   @Test
   public void testEncryptedAppend() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      setEncryptionConfigKeys(conf);
-      
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
+    setEncryptionConfigKeys();
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+
+    fs = getFileSystem(conf);
+
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
   }
   
   @Test
   public void testEncryptedAppendRequiringBlockTransfer() throws IOException {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      setEncryptionConfigKeys(conf);
-      
-      // start up 4 DNs
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
-      
-      FileSystem fs = getFileSystem(conf);
-      
-      // Create a file with replication 3, so its block is on 3 / 4 DNs.
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      // Shut down one of the DNs holding a block replica.
-      FSDataInputStream in = fs.open(TEST_PATH);
-      List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
-      in.close();
-      assertEquals(1, locatedBlocks.size());
-      assertEquals(3, locatedBlocks.get(0).getLocations().length);
-      DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
-      dn.shutdown();
-      
-      // Reopen the file for append, which will need to add another DN to the
-      // pipeline and in doing so trigger a block transfer.
-      writeTestDataToFile(fs);
-      assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
-      
-      fs.close();
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
+    setEncryptionConfigKeys();
+
+    // start up 4 DNs
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
+
+    fs = getFileSystem(conf);
+
+    // Create a file with replication 3, so its block is on 3 / 4 DNs.
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
+
+    // Shut down one of the DNs holding a block replica.
+    FSDataInputStream in = fs.open(TEST_PATH);
+    List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
+    in.close();
+    assertEquals(1, locatedBlocks.size());
+    assertEquals(3, locatedBlocks.get(0).getLocations().length);
+    DataNode dn = cluster.getDataNode(
+        locatedBlocks.get(0).getLocations()[0].getIpcPort());
+    dn.shutdown();
+
+    // Reopen the file for append, which will need to add another DN to the
+    // pipeline and in doing so trigger a block transfer.
+    writeTestDataToFile(fs);
+    assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
   }
   
   private static void writeTestDataToFile(FileSystem fs) throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message