Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C852319A29 for ; Wed, 6 Apr 2016 20:34:18 +0000 (UTC) Received: (qmail 79365 invoked by uid 500); 6 Apr 2016 20:34:18 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 79306 invoked by uid 500); 6 Apr 2016 20:34:18 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 79297 invoked by uid 99); 6 Apr 2016 20:34:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Apr 2016 20:34:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6D95ADFDE0; Wed, 6 Apr 2016 20:34:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: epayne@apache.org To: common-commits@hadoop.apache.org Message-Id: <823f80bee66441d7a9ed1e9e67322785@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-9945. Datanode command for evicting writers. Contributed by Kihwal Lee (cherry picked from commit aede8c10ecad4f2a8802a834e4bd0b8286cebade) Date: Wed, 6 Apr 2016 20:34:18 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2.8 8514ab8cb -> 497c65ad0 HDFS-9945. Datanode command for evicting writers. Contributed by Kihwal Lee (cherry picked from commit aede8c10ecad4f2a8802a834e4bd0b8286cebade) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/497c65ad Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/497c65ad Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/497c65ad Branch: refs/heads/branch-2.8 Commit: 497c65ad0cb1871af565937c8b14d174ec3c385c Parents: 8514ab8 Author: Eric Payne Authored: Wed Apr 6 20:20:14 2016 +0000 Committer: Eric Payne Committed: Wed Apr 6 20:33:40 2016 +0000 ---------------------------------------------------------------------- .../hdfs/protocol/ClientDatanodeProtocol.java | 7 +++ .../ClientDatanodeProtocolTranslatorPB.java | 12 +++++ .../src/main/proto/ClientDatanodeProtocol.proto | 10 ++++ ...tDatanodeProtocolServerSideTranslatorPB.java | 15 ++++++ .../hdfs/server/datanode/BlockReceiver.java | 3 ++ .../hadoop/hdfs/server/datanode/DataNode.java | 7 +++ .../hdfs/server/datanode/DataXceiver.java | 48 +++++++++++++++---- .../hdfs/server/datanode/DataXceiverServer.java | 6 +++ .../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 21 +++++++++ .../TestClientProtocolForPipelineRecovery.java | 49 ++++++++++++++++++++ 10 files changed, 170 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/497c65ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 5621de0..a60c17d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -141,6 +141,13 @@ public interface ClientDatanodeProtocol { void shutdownDatanode(boolean forUpgrade) throws IOException; /** + * Evict clients that are writing to a datanode. + * + * @throws IOException + */ + void evictWriters() throws IOException; + + /** * Obtains datanode info * * @return software/config version and uptime of the datanode http://git-wip-us.apache.org/repos/asf/hadoop/blob/497c65ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 31315c4..ec6b049 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; @@ -110,6 +111,8 @@ public class ClientDatanodeProtocolTranslatorPB implements private static final GetBalancerBandwidthRequestProto VOID_GET_BALANCER_BANDWIDTH = GetBalancerBandwidthRequestProto.newBuilder().build(); + private final static EvictWritersRequestProto VOID_EVICT_WRITERS = + EvictWritersRequestProto.newBuilder().build(); public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, Configuration conf, int socketTimeout, boolean connectToDnViaHostname, @@ -293,6 +296,15 @@ public class ClientDatanodeProtocolTranslatorPB implements } @Override + public void evictWriters() throws IOException { + try { + rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override public DatanodeLocalInfo getDatanodeInfo() throws IOException { GetDatanodeInfoResponseProto response; try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/497c65ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto index 83f66d9..d0c83b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto @@ -139,6 +139,13 @@ message ShutdownDatanodeRequestProto { message ShutdownDatanodeResponseProto { } +/** Tell datanode to evict active clients that are writing */ +message EvictWritersRequestProto { +} + +message EvictWritersResponseProto { +} + /** * Ping datanode for liveness and quick info */ @@ -239,6 +246,9 @@ service ClientDatanodeProtocolService { rpc shutdownDatanode(ShutdownDatanodeRequestProto) returns(ShutdownDatanodeResponseProto); + rpc evictWriters(EvictWritersRequestProto) + returns(EvictWritersResponseProto); + rpc getDatanodeInfo(GetDatanodeInfoRequestProto) returns(GetDatanodeInfoResponseProto); http://git-wip-us.apache.org/repos/asf/hadoop/blob/497c65ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index ff18f6d..fbf5797 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; @@ -83,6 +85,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements StartReconfigurationResponseProto.newBuilder().build(); private final static TriggerBlockReportResponseProto TRIGGER_BLOCK_REPORT_RESP = TriggerBlockReportResponseProto.newBuilder().build(); + private final static EvictWritersResponseProto EVICT_WRITERS_RESP = + EvictWritersResponseProto.newBuilder().build(); private final ClientDatanodeProtocol impl; @@ -190,6 +194,17 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements return SHUTDOWN_DATANODE_RESP; } + @Override + public EvictWritersResponseProto evictWriters(RpcController unused, + EvictWritersRequestProto request) throws ServiceException { + try { + impl.evictWriters(); + } catch (IOException e) { + throw new ServiceException(e); + } + return EVICT_WRITERS_RESP; + } + public GetDatanodeInfoResponseProto getDatanodeInfo(RpcController unused, GetDatanodeInfoRequestProto request) throws ServiceException { GetDatanodeInfoResponseProto res; http://git-wip-us.apache.org/repos/asf/hadoop/blob/497c65ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index bbb9096..a60d188 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -878,6 +878,9 @@ class BlockReceiver implements Closeable { } public void sendOOB() throws IOException, InterruptedException { + if (isDatanode) { + return; + } ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck .getRestartOOBStatus()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/497c65ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 7ee2971..56bec71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2936,6 +2936,13 @@ public class DataNode extends ReconfigurableBase } @Override //ClientDatanodeProtocol + public void evictWriters() throws IOException { + checkSuperuserPrivilege(); + LOG.info("Evicting all writers."); + xserver.stopWriters(); + } + + @Override //ClientDatanodeProtocol public DatanodeLocalInfo getDatanodeInfo() { long uptime = ManagementFactory.getRuntimeMXBean().getUptime()/1000; return new DatanodeLocalInfo(VersionInfo.getVersion(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/497c65ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index c23e8a7..e5ae98f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -115,6 +115,7 @@ class DataXceiver extends Receiver implements Runnable { private BlockReceiver blockReceiver = null; private final int ioFileBufferSize; private final int smallBufferSize; + private Thread xceiver = null; /** * Client Name used in previous operation. Not available on first request @@ -177,9 +178,38 @@ class DataXceiver extends Receiver implements Runnable { } public void sendOOB() throws IOException, InterruptedException { + BlockReceiver br = getCurrentBlockReceiver(); + if (br == null) { + return; + } + // This doesn't need to be in a critical section. Althogh the client + // can resue the connection to issue a different request, trying sending + // an OOB through the recently closed block receiver is harmless. LOG.info("Sending OOB to peer: " + peer); - if(blockReceiver!=null) - blockReceiver.sendOOB(); + br.sendOOB(); + } + + public void stopWriter() { + // We want to interrupt the xceiver only when it is serving writes. + synchronized(this) { + if (getCurrentBlockReceiver() == null) { + return; + } + xceiver.interrupt(); + } + LOG.info("Stopped the writer: " + peer); + } + + /** + * blockReceiver is updated at multiple places. Use the synchronized setter + * and getter. + */ + private synchronized void setCurrentBlockReceiver(BlockReceiver br) { + blockReceiver = br; + } + + private synchronized BlockReceiver getCurrentBlockReceiver() { + return blockReceiver; } /** @@ -191,6 +221,9 @@ class DataXceiver extends Receiver implements Runnable { Op op = null; try { + synchronized(this) { + xceiver = Thread.currentThread(); + } dataXceiverServer.addPeer(peer, Thread.currentThread(), this); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; @@ -678,12 +711,12 @@ class DataXceiver extends Receiver implements Runnable { if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver - blockReceiver = getBlockReceiver(block, storageType, in, + setCurrentBlockReceiver(getBlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, - cachingStrategy, allowLazyPersist, pinning); + cachingStrategy, allowLazyPersist, pinning)); replica = blockReceiver.getReplica(); } else { replica = datanode.data.recoverClose( @@ -852,7 +885,7 @@ class DataXceiver extends Receiver implements Runnable { IOUtils.closeStream(replyOut); IOUtils.closeSocket(mirrorSock); IOUtils.closeStream(blockReceiver); - blockReceiver = null; + setCurrentBlockReceiver(null); } //update metrics @@ -1079,7 +1112,6 @@ class DataXceiver extends Receiver implements Runnable { DataOutputStream proxyOut = null; Status opStatus = SUCCESS; String errMsg = null; - BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; boolean IoeDuringCopyBlockOperation = false; try { @@ -1138,11 +1170,11 @@ class DataXceiver extends Receiver implements Runnable { DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist - blockReceiver = getBlockReceiver(block, storageType, + setCurrentBlockReceiver(getBlockReceiver(block, storageType, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, - CachingStrategy.newDropBehind(), false, false); + CachingStrategy.newDropBehind(), false, false)); // receive a block blockReceiver.receiveBlock(null, null, replyOut, null, http://git-wip-us.apache.org/repos/asf/hadoop/blob/497c65ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 8152e6f..78a07c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -256,6 +256,12 @@ class DataXceiverServer implements Runnable { } } } + + public synchronized void stopWriters() { + for (Peer p : peers.keySet()) { + peersXceiver.get(p).stopWriter(); + } + } // Notify all peers of the shutdown and restart. // datanode.shouldRun should still be true and datanode.restarting should http://git-wip-us.apache.org/repos/asf/hadoop/blob/497c65ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 8e89304..96b7542 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -1059,6 +1059,10 @@ public class DFSAdmin extends FsShell { + "\tclients will timeout and ignore the datanode. In such case, the\n" + "\tfast start-up mode will also be disabled.\n"; + String evictWriters = "-evictWriters \n" + + "\tMake the datanode evict all clients that are writing a block.\n" + + "\tThis is useful if decommissioning is hung due to slow writers.\n"; + String getDatanodeInfo = "-getDatanodeInfo \n" + "\tGet the information about the given datanode. This command can\n" + "\tbe used for checking if a datanode is alive.\n"; @@ -1128,6 +1132,8 @@ public class DFSAdmin extends FsShell { System.out.println(disallowSnapshot); } else if ("shutdownDatanode".equalsIgnoreCase(cmd)) { System.out.println(shutdownDatanode); + } else if ("evictWriters".equalsIgnoreCase(cmd)) { + System.out.println(evictWriters); } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) { System.out.println(getDatanodeInfo); } else if ("help".equals(cmd)) { @@ -1162,6 +1168,7 @@ public class DFSAdmin extends FsShell { System.out.println(allowSnapshot); System.out.println(disallowSnapshot); System.out.println(shutdownDatanode); + System.out.println(evictWriters); System.out.println(getDatanodeInfo); System.out.println(triggerBlockReport); System.out.println(help); @@ -1935,6 +1942,8 @@ public class DFSAdmin extends FsShell { exitCode = fetchImage(argv, i); } else if ("-shutdownDatanode".equals(cmd)) { exitCode = shutdownDatanode(argv, i); + } else if ("-evictWriters".equals(cmd)) { + exitCode = evictWriters(argv, i); } else if ("-getDatanodeInfo".equals(cmd)) { exitCode = getDatanodeInfo(argv, i); } else if ("-reconfig".equals(cmd)) { @@ -2042,6 +2051,18 @@ public class DFSAdmin extends FsShell { return 0; } + private int evictWriters(String[] argv, int i) throws IOException { + final String dn = argv[i]; + ClientDatanodeProtocol dnProxy = getDataNodeProxy(dn); + try { + dnProxy.evictWriters(); + System.out.println("Requested writer eviction to datanode " + dn); + } catch (IOException ioe) { + return -1; + } + return 0; + } + private int getDatanodeInfo(String[] argv, int i) throws IOException { ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/497c65ad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 0eeb3b7..5e320fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -271,6 +271,55 @@ public class TestClientProtocolForPipelineRecovery { } } + /** + * Test that the writer is kicked out of a node. + */ + @Test + public void testEvictWriter() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes((int)3) + .build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Path file = new Path("testEvictWriter.dat"); + FSDataOutputStream out = fs.create(file, (short)2); + out.write(0x31); + out.hflush(); + + // get nodes in the pipeline + DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream(); + DatanodeInfo[] nodes = dfsOut.getPipeline(); + Assert.assertEquals(2, nodes.length); + String dnAddr = nodes[1].getIpcAddr(false); + + // evict the writer from the second datanode and wait until + // the pipeline is rebuilt. + DFSAdmin dfsadmin = new DFSAdmin(conf); + final String[] args1 = {"-evictWriters", dnAddr }; + Assert.assertEquals(0, dfsadmin.run(args1)); + out.write(0x31); + out.hflush(); + + // get the new pipline and check the node is not in there. + nodes = dfsOut.getPipeline(); + try { + Assert.assertTrue(nodes.length > 0 ); + for (int i = 0; i < nodes.length; i++) { + Assert.assertFalse(dnAddr.equals(nodes[i].getIpcAddr(false))); + } + } finally { + out.close(); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** Test restart timeout */ @Test public void testPipelineRecoveryOnRestartFailure() throws Exception {