Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D6A07200CAE for ; Wed, 7 Jun 2017 07:29:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D52C0160BD3; Wed, 7 Jun 2017 05:29:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8086D160BC6 for ; Wed, 7 Jun 2017 07:29:40 +0200 (CEST) Received: (qmail 23781 invoked by uid 500); 7 Jun 2017 05:29:39 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 23772 invoked by uid 99); 7 Jun 2017 05:29:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jun 2017 05:29:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 83639DFFB7; Wed, 7 Jun 2017 05:29:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vinayakumarb@apache.org To: common-commits@hadoop.apache.org Message-Id: <8fd2f6ed6bac493992c6dca1186db9b1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-11708. Positional read will fail if replicas moved to different DNs after stream is opened. Contributed by Vinayakumar B. Date: Wed, 7 Jun 2017 05:29:39 +0000 (UTC) archived-at: Wed, 07 Jun 2017 05:29:42 -0000 Repository: hadoop Updated Branches: refs/heads/branch-2.8 5fc4b8567 -> 4a391c72d HDFS-11708. Positional read will fail if replicas moved to different DNs after stream is opened. Contributed by Vinayakumar B. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4a391c72 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4a391c72 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4a391c72 Branch: refs/heads/branch-2.8 Commit: 4a391c72d4a67e2c8c1907bcddf457c94cfd2bce Parents: 5fc4b85 Author: Vinayakumar B Authored: Wed Jun 7 10:36:09 2017 +0530 Committer: Vinayakumar B Committed: Wed Jun 7 10:59:04 2017 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSInputStream.java | 46 +++--- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 46 ++++++ .../java/org/apache/hadoop/hdfs/TestPread.java | 147 +++++++++++++++++++ .../server/datanode/TestBlockReplacement.java | 42 +----- 4 files changed, 227 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a391c72/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 873fb03..f99a15b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -642,6 +642,8 @@ public class DFSInputStream extends FSInputStream chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; StorageType storageType = retval.storageType; + // Latest block if refreshed by chooseDatanode() + targetBlock = retval.block; try { blockReader = getBlockReader(targetBlock, offsetIntoBlock, @@ -1090,7 +1092,7 @@ public class DFSInputStream extends FSInputStream chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); DFSClient.LOG.debug("Connecting to datanode {}", dnAddr); InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); - return new DNAddrPair(chosenNode, targetAddr, storageType); + return new DNAddrPair(chosenNode, targetAddr, storageType, block); } private static String getBestNodeDNAddrPairErrorString( @@ -1122,12 +1124,13 @@ public class DFSInputStream extends FSInputStream byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { - block = refreshLocatedBlock(block); while (true) { DNAddrPair addressPair = chooseDataNode(block, null); + // Latest block, if refreshed internally + block = addressPair.block; try { - actualGetFromOneDataNode(addressPair, block, start, end, - buf, offset, corruptedBlockMap); + actualGetFromOneDataNode(addressPair, start, end, buf, offset, + corruptedBlockMap); return; } catch (IOException e) { // Ignore. Already processed inside the function. @@ -1149,8 +1152,8 @@ public class DFSInputStream extends FSInputStream int offset = bb.position(); try (TraceScope ignored = dfsClient.getTracer(). newScope("hedgedRead" + hedgedReadId, parentSpanId)) { - actualGetFromOneDataNode(datanode, block, start, end, buf, - offset, corruptedBlockMap); + actualGetFromOneDataNode(datanode, start, end, buf, offset, + corruptedBlockMap); return bb; } } @@ -1161,18 +1164,17 @@ public class DFSInputStream extends FSInputStream * Used when reading contiguous blocks */ private void actualGetFromOneDataNode(final DNAddrPair datanode, - LocatedBlock block, final long start, final long end, byte[] buf, - int offset, Map> corruptedBlockMap) + final long start, final long end, byte[] buf, int offset, + Map> corruptedBlockMap) throws IOException { final int length = (int) (end - start + 1); - actualGetFromOneDataNode(datanode, block, start, end, buf, - new int[]{offset}, new int[]{length}, corruptedBlockMap); + actualGetFromOneDataNode(datanode, start, end, buf, new int[] { offset }, + new int[] { length }, corruptedBlockMap); } /** * Read data from one DataNode. * @param datanode the datanode from which to read data - * @param block the located block containing the requested data * @param startInBlk the startInBlk offset of the block * @param endInBlk the endInBlk offset of the block * @param buf the given byte array into which the data is read @@ -1184,9 +1186,8 @@ public class DFSInputStream extends FSInputStream * block replica */ void actualGetFromOneDataNode(final DNAddrPair datanode, - LocatedBlock block, final long startInBlk, final long endInBlk, - byte[] buf, int[] offsets, int[] lengths, - Map> corruptedBlockMap) + final long startInBlk, final long endInBlk, byte[] buf, + int[] offsets, int[] lengths, Map> corruptedBlockMap) throws IOException { DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once @@ -1194,11 +1195,8 @@ public class DFSInputStream extends FSInputStream final int len = (int) (endInBlk - startInBlk + 1); checkReadPortions(offsets, lengths, len); + LocatedBlock block = datanode.block; while (true) { - // cached block locations may have been updated by chooseDataNode() - // or fetchBlockAt(). Always get the latest list of locations at the - // start of the loop. - block = refreshLocatedBlock(block); BlockReader reader = null; try { DFSClientFaultInjector.get().fetchFromDatanodeException(); @@ -1246,6 +1244,9 @@ public class DFSInputStream extends FSInputStream addToDeadNodes(datanode.info); throw new IOException(msg); } + // Refresh the block for updated tokens in case of token failures or + // encryption key failures. + block = refreshLocatedBlock(block); } finally { if (reader != null) { reader.close(); @@ -1300,7 +1301,6 @@ public class DFSInputStream extends FSInputStream ByteBuffer bb; int len = (int) (end - start + 1); int hedgedReadId = 0; - block = refreshLocatedBlock(block); while (true) { // see HDFS-6591, this metric is used to verify/catch unnecessary loops hedgedReadOpsLoopNumForTesting++; @@ -1310,6 +1310,8 @@ public class DFSInputStream extends FSInputStream // chooseDataNode is a commitment. If no node, we go to // the NN to reget block locations. Only go here on first read. chosenNode = chooseDataNode(block, ignored); + // Latest block, if refreshed internally + block = chosenNode.block; bb = ByteBuffer.allocate(len); Callable getFromDataNodeCallable = getFromOneDataNode( chosenNode, block, start, end, bb, @@ -1344,6 +1346,8 @@ public class DFSInputStream extends FSInputStream if (chosenNode == null) { chosenNode = chooseDataNode(block, ignored); } + // Latest block, if refreshed internally + block = chosenNode.block; bb = ByteBuffer.allocate(len); Callable getFromDataNodeCallable = getFromOneDataNode( chosenNode, block, start, end, bb, @@ -1693,12 +1697,14 @@ public class DFSInputStream extends FSInputStream final DatanodeInfo info; final InetSocketAddress addr; final StorageType storageType; + final LocatedBlock block; DNAddrPair(DatanodeInfo info, InetSocketAddress addr, - StorageType storageType) { + StorageType storageType, LocatedBlock block) { this.info = info; this.addr = addr; this.storageType = storageType; + this.block = block; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a391c72/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 70ec37e..9350738 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -50,6 +50,7 @@ import java.lang.reflect.Modifier; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketException; import java.net.URI; import java.net.URL; import java.net.URLConnection; @@ -110,7 +111,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -1955,6 +1958,49 @@ public class DFSTestUtil { return DFSTestUtil.getFileSystemAs(ugi, conf); } + /* + * Copy a block from sourceProxy to destination. If the block becomes + * over-replicated, preferably remove it from source. + * Return true if a block is successfully copied; otherwise false. + */ + public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source, + DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { + return replaceBlock(block, source, sourceProxy, destination, + StorageType.DEFAULT, Status.SUCCESS); + } + + /* + * Replace block + */ + public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source, + DatanodeInfo sourceProxy, DatanodeInfo destination, + StorageType targetStorageType, Status opStatus) throws IOException, + SocketException { + Socket sock = new Socket(); + try { + sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()), + HdfsConstants.READ_TIMEOUT); + sock.setKeepAlive(true); + // sendRequest + DataOutputStream out = new DataOutputStream(sock.getOutputStream()); + new Sender(out).replaceBlock(block, targetStorageType, + BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), + sourceProxy); + out.flush(); + // receiveResponse + DataInputStream reply = new DataInputStream(sock.getInputStream()); + + BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom( + reply); + while (proto.getStatus() == Status.IN_PROGRESS) { + proto = BlockOpResponseProto.parseDelimitedFrom(reply); + } + return proto.getStatus() == opStatus; + } finally { + sock.close(); + } + } + /** * Test if the given {@link FileStatus} user, group owner and its permission * are expected, throw {@link AssertionError} if any value is not expected. http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a391c72/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index b1858e2..e154490 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -24,6 +24,8 @@ import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -31,6 +33,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; @@ -38,6 +43,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; @@ -51,6 +59,8 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.base.Supplier; + /** * This class tests the DFS positional read functionality in a single node * mini-cluster. @@ -544,6 +554,143 @@ public class TestPread { } } + /** + * Scenario: 1. Write a file with RF=2, DN1 and DN2
+ * 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.
+ * 3. Move block from DN2 to DN3.
+ * 4. Let block gets replicated to another DN3
+ * 5. Stop DN1 also.
+ * 6. Current valid Block locations in NameNode [DN1, DN3]
+ * 7. Consider next calls to getBlockLocations() always returns DN3 as last + * location.
+ */ + @Test + public void testPreadFailureWithChangedBlockLocations() throws Exception { + doPreadTestWithChangedLocations(); + } + + /** + * Scenario: 1. Write a file with RF=2, DN1 and DN2
+ * 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.
+ * 3. Move block from DN2 to DN3.
+ * 4. Let block gets replicated to another DN3
+ * 5. Stop DN1 also.
+ * 6. Current valid Block locations in NameNode [DN1, DN3]
+ * 7. Consider next calls to getBlockLocations() always returns DN3 as last + * location.
+ */ + @Test + public void testPreadHedgedFailureWithChangedBlockLocations() + throws Exception { + isHedgedRead = true; + doPreadTestWithChangedLocations(); + } + + private void doPreadTestWithChangedLocations() + throws IOException, TimeoutException, InterruptedException { + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + if (isHedgedRead) { + conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2); + } + try (MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { + DistributedFileSystem dfs = cluster.getFileSystem(); + final Path p = new Path("/test"); + String data = "testingmissingblock"; + DFSTestUtil.writeFile(dfs, p, data); + + FSDataInputStream in = dfs.open(p); + List blocks = DFSTestUtil.getAllBlocks(in); + LocatedBlock lb = blocks.get(0); + DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0); + blocks = DFSTestUtil.getAllBlocks(in); + DatanodeInfo[] locations = null; + for (LocatedBlock locatedBlock : blocks) { + locations = locatedBlock.getLocations(); + DFSClient.LOG + .info(locatedBlock.getBlock() + " " + Arrays.toString(locations)); + } + final DatanodeInfo validDownLocation = locations[0]; + final DFSClient client = dfs.getClient(); + final DFSClient dfsClient = Mockito.spy(client); + // Keep the valid location as last in the locations list for second + // requests + // onwards. + final AtomicInteger count = new AtomicInteger(0); + Mockito.doAnswer(new Answer() { + @Override + public LocatedBlocks answer(InvocationOnMock invocation) + throws Throwable { + if (count.compareAndSet(0, 1)) { + return (LocatedBlocks) invocation.callRealMethod(); + } + Object obj = invocation.callRealMethod(); + LocatedBlocks locatedBlocks = (LocatedBlocks) obj; + LocatedBlock lb = locatedBlocks.get(0); + DatanodeInfo[] locations = lb.getLocations(); + if (!(locations[0].getName().equals(validDownLocation.getName()))) { + // Latest location which is currently down, should be first + DatanodeInfo l = locations[0]; + locations[0] = locations[locations.length - 1]; + locations[locations.length - 1] = l; + } + return locatedBlocks; + } + }).when(dfsClient).getLocatedBlocks(p.toString(), 0); + + // Findout target node to move the block to. + DatanodeInfo[] nodes = + cluster.getNameNodeRpc().getDatanodeReport(DatanodeReportType.LIVE); + DatanodeInfo toMove = null; + List locationsList = Arrays.asList(locations); + for (DatanodeInfo node : nodes) { + if (locationsList.contains(node)) { + continue; + } + toMove = node; + break; + } + // STEP 2: Open stream + DFSInputStream din = dfsClient.open(p.toString()); + // STEP 3: Move replica + final DatanodeInfo source = locations[1]; + final DatanodeInfo destination = toMove; + DFSTestUtil.replaceBlock(lb.getBlock(), source, locations[1], toMove); + // Wait for replica to get deleted + GenericTestUtils.waitFor(new Supplier() { + + @Override + public Boolean get() { + try { + LocatedBlocks lbs = dfsClient.getLocatedBlocks(p.toString(), 0); + LocatedBlock lb = lbs.get(0); + List locations = Arrays.asList(lb.getLocations()); + DFSClient.LOG + .info("Source :" + source + ", destination: " + destination); + DFSClient.LOG.info("Got updated locations :" + locations); + return locations.contains(destination) + && !locations.contains(source); + } catch (IOException e) { + DFSClient.LOG.error("Problem in getting block locations", e); + } + return null; + } + }, 1000, 10000); + DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0); + // STEP 4: Stop first node in new locations + cluster.stopDataNode(validDownLocation.getName()); + DFSClient.LOG.info("Starting read"); + byte[] buf = new byte[1024]; + int n = din.read(0, buf, 0, data.length()); + assertEquals(data.length(), n); + assertEquals("Data should be read", data, new String(buf, 0, n)); + DFSClient.LOG.info("Read completed"); + } + } + public static void main(String[] args) throws Exception { new TestPread().testPreadDFS(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a391c72/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index 597dc46..341f846 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Socket; import java.net.SocketException; import java.util.ArrayList; import java.util.Arrays; @@ -48,19 +48,14 @@ import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Time; import org.junit.Test; @@ -310,8 +305,8 @@ public class TestBlockReplacement { */ private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { - return replaceBlock(block, source, sourceProxy, destination, - StorageType.DEFAULT); + return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination, + StorageType.DEFAULT, Status.SUCCESS); } /* @@ -323,29 +318,8 @@ public class TestBlockReplacement { DatanodeInfo sourceProxy, DatanodeInfo destination, StorageType targetStorageType) throws IOException, SocketException { - Socket sock = new Socket(); - try { - sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()), - HdfsConstants.READ_TIMEOUT); - sock.setKeepAlive(true); - // sendRequest - DataOutputStream out = new DataOutputStream(sock.getOutputStream()); - new Sender(out).replaceBlock(block, targetStorageType, - BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), - sourceProxy); - out.flush(); - // receiveResponse - DataInputStream reply = new DataInputStream(sock.getInputStream()); - - BlockOpResponseProto proto = - BlockOpResponseProto.parseDelimitedFrom(reply); - while (proto.getStatus() == Status.IN_PROGRESS) { - proto = BlockOpResponseProto.parseDelimitedFrom(reply); - } - return proto.getStatus() == Status.SUCCESS; - } finally { - sock.close(); - } + return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination, + targetStorageType, Status.SUCCESS); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org