hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinayakum...@apache.org
Subject hadoop git commit: HDFS-11708. Positional read will fail if replicas moved to different DNs after stream is opened. Contributed by Vinayakumar B.
Date Wed, 07 Jun 2017 05:29:39 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 5fc4b8567 -> 4a391c72d


HDFS-11708. Positional read will fail if replicas moved to different DNs after stream is opened.
Contributed by Vinayakumar B.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4a391c72
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4a391c72
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4a391c72

Branch: refs/heads/branch-2.8
Commit: 4a391c72d4a67e2c8c1907bcddf457c94cfd2bce
Parents: 5fc4b85
Author: Vinayakumar B <vinayakumarb@apache.org>
Authored: Wed Jun 7 10:36:09 2017 +0530
Committer: Vinayakumar B <vinayakumarb@apache.org>
Committed: Wed Jun 7 10:59:04 2017 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  46 +++---
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  46 ++++++
 .../java/org/apache/hadoop/hdfs/TestPread.java  | 147 +++++++++++++++++++
 .../server/datanode/TestBlockReplacement.java   |  42 +-----
 4 files changed, 227 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a391c72/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 873fb03..f99a15b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -642,6 +642,8 @@ public class DFSInputStream extends FSInputStream
       chosenNode = retval.info;
       InetSocketAddress targetAddr = retval.addr;
       StorageType storageType = retval.storageType;
+      // Latest block if refreshed by chooseDatanode()
+      targetBlock = retval.block;
 
       try {
         blockReader = getBlockReader(targetBlock, offsetIntoBlock,
@@ -1090,7 +1092,7 @@ public class DFSInputStream extends FSInputStream
         chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
     DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
     InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
-    return new DNAddrPair(chosenNode, targetAddr, storageType);
+    return new DNAddrPair(chosenNode, targetAddr, storageType, block);
   }
 
   private static String getBestNodeDNAddrPairErrorString(
@@ -1122,12 +1124,13 @@ public class DFSInputStream extends FSInputStream
       byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
-    block = refreshLocatedBlock(block);
     while (true) {
       DNAddrPair addressPair = chooseDataNode(block, null);
+      // Latest block, if refreshed internally
+      block = addressPair.block;
       try {
-        actualGetFromOneDataNode(addressPair, block, start, end,
-            buf, offset, corruptedBlockMap);
+        actualGetFromOneDataNode(addressPair, start, end, buf, offset,
+            corruptedBlockMap);
         return;
       } catch (IOException e) {
         // Ignore. Already processed inside the function.
@@ -1149,8 +1152,8 @@ public class DFSInputStream extends FSInputStream
         int offset = bb.position();
         try (TraceScope ignored = dfsClient.getTracer().
             newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
-          actualGetFromOneDataNode(datanode, block, start, end, buf,
-              offset, corruptedBlockMap);
+          actualGetFromOneDataNode(datanode, start, end, buf, offset,
+              corruptedBlockMap);
           return bb;
         }
       }
@@ -1161,18 +1164,17 @@ public class DFSInputStream extends FSInputStream
    * Used when reading contiguous blocks
    */
   private void actualGetFromOneDataNode(final DNAddrPair datanode,
-      LocatedBlock block, final long start, final long end, byte[] buf,
-      int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      final long start, final long end, byte[] buf, int offset,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     final int length = (int) (end - start + 1);
-    actualGetFromOneDataNode(datanode, block, start, end, buf,
-        new int[]{offset}, new int[]{length}, corruptedBlockMap);
+    actualGetFromOneDataNode(datanode, start, end, buf, new int[] { offset },
+        new int[] { length }, corruptedBlockMap);
   }
 
   /**
    * Read data from one DataNode.
    * @param datanode the datanode from which to read data
-   * @param block the located block containing the requested data
    * @param startInBlk the startInBlk offset of the block
    * @param endInBlk the endInBlk offset of the block
    * @param buf the given byte array into which the data is read
@@ -1184,9 +1186,8 @@ public class DFSInputStream extends FSInputStream
    *                          block replica
    */
   void actualGetFromOneDataNode(final DNAddrPair datanode,
-      LocatedBlock block, final long startInBlk, final long endInBlk,
-      byte[] buf, int[] offsets, int[] lengths,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      final long startInBlk, final long endInBlk, byte[] buf,
+      int[] offsets, int[] lengths, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     DFSClientFaultInjector.get().startFetchFromDatanode();
     int refetchToken = 1; // only need to get a new access token once
@@ -1194,11 +1195,8 @@ public class DFSInputStream extends FSInputStream
     final int len = (int) (endInBlk - startInBlk + 1);
     checkReadPortions(offsets, lengths, len);
 
+    LocatedBlock block = datanode.block;
     while (true) {
-      // cached block locations may have been updated by chooseDataNode()
-      // or fetchBlockAt(). Always get the latest list of locations at the
-      // start of the loop.
-      block = refreshLocatedBlock(block);
       BlockReader reader = null;
       try {
         DFSClientFaultInjector.get().fetchFromDatanodeException();
@@ -1246,6 +1244,9 @@ public class DFSInputStream extends FSInputStream
           addToDeadNodes(datanode.info);
           throw new IOException(msg);
         }
+        // Refresh the block for updated tokens in case of token failures or
+        // encryption key failures.
+        block = refreshLocatedBlock(block);
       } finally {
         if (reader != null) {
           reader.close();
@@ -1300,7 +1301,6 @@ public class DFSInputStream extends FSInputStream
     ByteBuffer bb;
     int len = (int) (end - start + 1);
     int hedgedReadId = 0;
-    block = refreshLocatedBlock(block);
     while (true) {
       // see HDFS-6591, this metric is used to verify/catch unnecessary loops
       hedgedReadOpsLoopNumForTesting++;
@@ -1310,6 +1310,8 @@ public class DFSInputStream extends FSInputStream
         // chooseDataNode is a commitment. If no node, we go to
         // the NN to reget block locations. Only go here on first read.
         chosenNode = chooseDataNode(block, ignored);
+        // Latest block, if refreshed internally
+        block = chosenNode.block;
         bb = ByteBuffer.allocate(len);
         Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
             chosenNode, block, start, end, bb,
@@ -1344,6 +1346,8 @@ public class DFSInputStream extends FSInputStream
           if (chosenNode == null) {
             chosenNode = chooseDataNode(block, ignored);
           }
+          // Latest block, if refreshed internally
+          block = chosenNode.block;
           bb = ByteBuffer.allocate(len);
           Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
               chosenNode, block, start, end, bb,
@@ -1693,12 +1697,14 @@ public class DFSInputStream extends FSInputStream
     final DatanodeInfo info;
     final InetSocketAddress addr;
     final StorageType storageType;
+    final LocatedBlock block;
 
     DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
-        StorageType storageType) {
+        StorageType storageType, LocatedBlock block) {
       this.info = info;
       this.addr = addr;
       this.storageType = storageType;
+      this.block = block;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a391c72/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 70ec37e..9350738 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -50,6 +50,7 @@ import java.lang.reflect.Modifier;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketException;
 import java.net.URI;
 import java.net.URL;
 import java.net.URLConnection;
@@ -110,7 +111,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -1955,6 +1958,49 @@ public class DFSTestUtil {
     return DFSTestUtil.getFileSystemAs(ugi, conf);
   }
 
+  /*
+   * Copy a block from sourceProxy to destination. If the block becomes
+   * over-replicated, preferably remove it from source.
+   * Return true if a block is successfully copied; otherwise false.
+   */
+  public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source,
+      DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
+    return replaceBlock(block, source, sourceProxy, destination,
+        StorageType.DEFAULT, Status.SUCCESS);
+  }
+
+  /*
+   * Replace block
+   */
+  public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source,
+      DatanodeInfo sourceProxy, DatanodeInfo destination,
+      StorageType targetStorageType, Status opStatus) throws IOException,
+      SocketException {
+    Socket sock = new Socket();
+    try {
+      sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
+          HdfsConstants.READ_TIMEOUT);
+      sock.setKeepAlive(true);
+      // sendRequest
+      DataOutputStream out = new DataOutputStream(sock.getOutputStream());
+      new Sender(out).replaceBlock(block, targetStorageType,
+          BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
+          sourceProxy);
+      out.flush();
+      // receiveResponse
+      DataInputStream reply = new DataInputStream(sock.getInputStream());
+
+      BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(
+          reply);
+      while (proto.getStatus() == Status.IN_PROGRESS) {
+        proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+      }
+      return proto.getStatus() == opStatus;
+    } finally {
+      sock.close();
+    }
+  }
+
   /**
    * Test if the given {@link FileStatus} user, group owner and its permission
    * are expected, throw {@link AssertionError} if any value is not expected.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a391c72/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index b1858e2..e154490 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -24,6 +24,8 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -31,6 +33,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -38,6 +43,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -51,6 +59,8 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.base.Supplier;
+
 /**
  * This class tests the DFS positional read functionality in a single node
  * mini-cluster.
@@ -544,6 +554,143 @@ public class TestPread {
     }
   }
 
+  /**
+   * Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
+   * 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
+   * 3. Move block from DN2 to DN3.<br>
+   * 4. Let block gets replicated to another DN3<br>
+   * 5. Stop DN1 also.<br>
+   * 6. Current valid Block locations in NameNode [DN1, DN3]<br>
+   * 7. Consider next calls to getBlockLocations() always returns DN3 as last
+   * location.<br>
+   */
+  @Test
+  public void testPreadFailureWithChangedBlockLocations() throws Exception {
+    doPreadTestWithChangedLocations();
+  }
+
+  /**
+   * Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
+   * 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
+   * 3. Move block from DN2 to DN3.<br>
+   * 4. Let block gets replicated to another DN3<br>
+   * 5. Stop DN1 also.<br>
+   * 6. Current valid Block locations in NameNode [DN1, DN3]<br>
+   * 7. Consider next calls to getBlockLocations() always returns DN3 as last
+   * location.<br>
+   */
+  @Test
+  public void testPreadHedgedFailureWithChangedBlockLocations()
+      throws Exception {
+    isHedgedRead = true;
+    doPreadTestWithChangedLocations();
+  }
+
+  private void doPreadTestWithChangedLocations()
+      throws IOException, TimeoutException, InterruptedException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    if (isHedgedRead) {
+      conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2);
+    }
+    try (MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      final Path p = new Path("/test");
+      String data = "testingmissingblock";
+      DFSTestUtil.writeFile(dfs, p, data);
+
+      FSDataInputStream in = dfs.open(p);
+      List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(in);
+      LocatedBlock lb = blocks.get(0);
+      DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0);
+      blocks = DFSTestUtil.getAllBlocks(in);
+      DatanodeInfo[] locations = null;
+      for (LocatedBlock locatedBlock : blocks) {
+        locations = locatedBlock.getLocations();
+        DFSClient.LOG
+            .info(locatedBlock.getBlock() + " " + Arrays.toString(locations));
+      }
+      final DatanodeInfo validDownLocation = locations[0];
+      final DFSClient client = dfs.getClient();
+      final DFSClient dfsClient = Mockito.spy(client);
+      // Keep the valid location as last in the locations list for second
+      // requests
+      // onwards.
+      final AtomicInteger count = new AtomicInteger(0);
+      Mockito.doAnswer(new Answer<LocatedBlocks>() {
+        @Override
+        public LocatedBlocks answer(InvocationOnMock invocation)
+            throws Throwable {
+          if (count.compareAndSet(0, 1)) {
+            return (LocatedBlocks) invocation.callRealMethod();
+          }
+          Object obj = invocation.callRealMethod();
+          LocatedBlocks locatedBlocks = (LocatedBlocks) obj;
+          LocatedBlock lb = locatedBlocks.get(0);
+          DatanodeInfo[] locations = lb.getLocations();
+          if (!(locations[0].getName().equals(validDownLocation.getName()))) {
+            // Latest location which is currently down, should be first
+            DatanodeInfo l = locations[0];
+            locations[0] = locations[locations.length - 1];
+            locations[locations.length - 1] = l;
+          }
+          return locatedBlocks;
+        }
+      }).when(dfsClient).getLocatedBlocks(p.toString(), 0);
+
+      // Findout target node to move the block to.
+      DatanodeInfo[] nodes =
+          cluster.getNameNodeRpc().getDatanodeReport(DatanodeReportType.LIVE);
+      DatanodeInfo toMove = null;
+      List<DatanodeInfo> locationsList = Arrays.asList(locations);
+      for (DatanodeInfo node : nodes) {
+        if (locationsList.contains(node)) {
+          continue;
+        }
+        toMove = node;
+        break;
+      }
+      // STEP 2: Open stream
+      DFSInputStream din = dfsClient.open(p.toString());
+      // STEP 3: Move replica
+      final DatanodeInfo source = locations[1];
+      final DatanodeInfo destination = toMove;
+      DFSTestUtil.replaceBlock(lb.getBlock(), source, locations[1], toMove);
+      // Wait for replica to get deleted
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+
+        @Override
+        public Boolean get() {
+          try {
+            LocatedBlocks lbs = dfsClient.getLocatedBlocks(p.toString(), 0);
+            LocatedBlock lb = lbs.get(0);
+            List<DatanodeInfo> locations = Arrays.asList(lb.getLocations());
+            DFSClient.LOG
+                .info("Source :" + source + ", destination: " + destination);
+            DFSClient.LOG.info("Got updated locations :" + locations);
+            return locations.contains(destination)
+                && !locations.contains(source);
+          } catch (IOException e) {
+            DFSClient.LOG.error("Problem in getting block locations", e);
+          }
+          return null;
+        }
+      }, 1000, 10000);
+      DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0);
+      // STEP 4: Stop first node in new locations
+      cluster.stopDataNode(validDownLocation.getName());
+      DFSClient.LOG.info("Starting read");
+      byte[] buf = new byte[1024];
+      int n = din.read(0, buf, 0, data.length());
+      assertEquals(data.length(), n);
+      assertEquals("Data should be read", data, new String(buf, 0, n));
+      DFSClient.LOG.info("Read completed");
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     new TestPread().testPreadDFS();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a391c72/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index 597dc46..341f846 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -17,13 +17,13 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -48,19 +48,14 @@ import org.apache.hadoop.hdfs.client.BlockReportOptions;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
@@ -310,8 +305,8 @@ public class TestBlockReplacement {
    */
   private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
       DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
-    return replaceBlock(block, source, sourceProxy, destination,
-        StorageType.DEFAULT);
+    return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination,
+        StorageType.DEFAULT, Status.SUCCESS);
   }
 
   /*
@@ -323,29 +318,8 @@ public class TestBlockReplacement {
       DatanodeInfo sourceProxy,
       DatanodeInfo destination,
       StorageType targetStorageType) throws IOException, SocketException {
-    Socket sock = new Socket();
-    try {
-      sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
-          HdfsConstants.READ_TIMEOUT);
-      sock.setKeepAlive(true);
-      // sendRequest
-      DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-      new Sender(out).replaceBlock(block, targetStorageType,
-          BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
-          sourceProxy);
-      out.flush();
-      // receiveResponse
-      DataInputStream reply = new DataInputStream(sock.getInputStream());
-
-      BlockOpResponseProto proto =
-          BlockOpResponseProto.parseDelimitedFrom(reply);
-      while (proto.getStatus() == Status.IN_PROGRESS) {
-        proto = BlockOpResponseProto.parseDelimitedFrom(reply);
-      }
-      return proto.getStatus() == Status.SUCCESS;
-    } finally {
-      sock.close();
-    }
+    return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination,
+        targetStorageType, Status.SUCCESS);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message