Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 72F9E1196D for ; Tue, 22 Jul 2014 08:24:13 +0000 (UTC) Received: (qmail 62376 invoked by uid 500); 22 Jul 2014 08:24:13 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 62315 invoked by uid 500); 22 Jul 2014 08:24:13 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 62303 invoked by uid 99); 22 Jul 2014 08:24:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Jul 2014 08:24:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Jul 2014 08:24:05 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2052E23889E9; Tue, 22 Jul 2014 08:23:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1612505 [1/2] - in /hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/apache/hadoop/hdfs/... Date: Tue, 22 Jul 2014 08:23:36 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140722082338.2052E23889E9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: szetszwo Date: Tue Jul 22 08:23:34 2014 New Revision: 1612505 URL: http://svn.apache.org/r1612505 Log: Merge r1609845 through r1612502 from trunk. Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1612432-1612502 Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jul 22 08:23:34 2014 @@ -324,6 +324,10 @@ Release 2.6.0 - UNRELEASED HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo) + HDFS-6702. Change DFSClient to pass the StorageType from the namenode to + datanodes and change datanode to write block replicas using the specified + storage type. (szetszwo) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1612432-1612502 Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Jul 22 08:23:34 2014 @@ -313,6 +313,7 @@ public class DFSOutputStream extends FSO private DataInputStream blockReplyStream; private ResponseProcessor response = null; private volatile DatanodeInfo[] nodes = null; // list of targets for current block + private volatile StorageType[] storageTypes = null; private volatile String[] storageIDs = null; private final LoadingCache excludedNodes = CacheBuilder.newBuilder() @@ -417,10 +418,12 @@ public class DFSOutputStream extends FSO } private void setPipeline(LocatedBlock lb) { - setPipeline(lb.getLocations(), lb.getStorageIDs()); + setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); } - private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) { + private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, + String[] storageIDs) { this.nodes = nodes; + this.storageTypes = storageTypes; this.storageIDs = storageIDs; } @@ -446,7 +449,7 @@ public class DFSOutputStream extends FSO this.setName("DataStreamer for file " + src); closeResponder(); closeStream(); - setPipeline(null, null); + setPipeline(null, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -1031,10 +1034,12 @@ public class DFSOutputStream extends FSO //transfer replica final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1]; final DatanodeInfo[] targets = {nodes[d]}; - transfer(src, targets, lb.getBlockToken()); + final StorageType[] targetStorageTypes = {storageTypes[d]}; + transfer(src, targets, targetStorageTypes, lb.getBlockToken()); } private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final Token blockToken) throws IOException { //transfer replica to the new datanode Socket sock = null; @@ -1056,7 +1061,7 @@ public class DFSOutputStream extends FSO //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, - targets); + targets, targetStorageTypes); out.flush(); //ack @@ -1135,16 +1140,15 @@ public class DFSOutputStream extends FSO failed.add(nodes[errorIndex]); DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; - System.arraycopy(nodes, 0, newnodes, 0, errorIndex); - System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex, - newnodes.length-errorIndex); + arraycopy(nodes, newnodes, errorIndex); + + final StorageType[] newStorageTypes = new StorageType[newnodes.length]; + arraycopy(storageTypes, newStorageTypes, errorIndex); final String[] newStorageIDs = new String[newnodes.length]; - System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex); - System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex, - newStorageIDs.length-errorIndex); + arraycopy(storageIDs, newStorageIDs, errorIndex); - setPipeline(newnodes, newStorageIDs); + setPipeline(newnodes, newStorageTypes, newStorageIDs); // Just took care of a node error while waiting for a node restart if (restartingNodeIndex >= 0) { @@ -1181,7 +1185,7 @@ public class DFSOutputStream extends FSO // set up the pipeline again with the remaining nodes if (failPacket) { // for testing - success = createBlockOutputStream(nodes, newGS, isRecovery); + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); failPacket = false; try { // Give DNs time to send in bad reports. In real situations, @@ -1190,7 +1194,7 @@ public class DFSOutputStream extends FSO Thread.sleep(2000); } catch (InterruptedException ie) {} } else { - success = createBlockOutputStream(nodes, newGS, isRecovery); + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); } if (restartingNodeIndex >= 0) { @@ -1242,6 +1246,7 @@ public class DFSOutputStream extends FSO private LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; + StorageType[] storageTypes = null; int count = dfsClient.getConf().nBlockWriteRetry; boolean success = false; ExtendedBlock oldBlock = block; @@ -1264,11 +1269,12 @@ public class DFSOutputStream extends FSO bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); + storageTypes = lb.getStorageTypes(); // // Connect to first DataNode in the list. // - success = createBlockOutputStream(nodes, 0L, false); + success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) { DFSClient.LOG.info("Abandoning " + block); @@ -1289,8 +1295,8 @@ public class DFSOutputStream extends FSO // connects to the first datanode in the pipeline // Returns true if success, otherwise return failure. // - private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, - boolean recoveryFlag) { + private boolean createBlockOutputStream(DatanodeInfo[] nodes, + StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { if (nodes.length == 0) { DFSClient.LOG.info("nodes are empty for write pipeline of block " + block); @@ -1332,9 +1338,10 @@ public class DFSOutputStream extends FSO // Xmit header info to datanode // + BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage; // send the request - new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, - nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, + new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken, + dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, cachingStrategy.get()); @@ -2197,4 +2204,9 @@ public class DFSOutputStream extends FSO public long getFileId() { return fileId; } + + private static void arraycopy(T[] srcs, T[] dsts, int skipIndex) { + System.arraycopy(srcs, 0, dsts, 0, skipIndex); + System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex); + } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Tue Jul 22 08:23:34 2014 @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -71,11 +72,20 @@ public interface DataTransferProtocol { /** * Write a block to a datanode pipeline. - * + * The receiver datanode of this call is the next datanode in the pipeline. + * The other downstream datanodes are specified by the targets parameter. + * Note that the receiver {@link DatanodeInfo} is not required in the + * parameter list since the receiver datanode knows its info. However, the + * {@link StorageType} for storing the replica in the receiver datanode is a + * parameter since the receiver datanode may support multiple storage types. + * * @param blk the block being written. + * @param storageType for storing the replica in the receiver datanode. * @param blockToken security token for accessing the block. * @param clientName client's name. - * @param targets target datanodes in the pipeline. + * @param targets other downstream datanodes in the pipeline. + * @param targetStorageTypes target {@link StorageType}s corresponding + * to the target datanodes. * @param source source datanode. * @param stage pipeline stage. * @param pipelineSize the size of the pipeline. @@ -84,9 +94,11 @@ public interface DataTransferProtocol { * @param latestGenerationStamp the latest generation stamp of the block. */ public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String clientName, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -110,7 +122,8 @@ public interface DataTransferProtocol { public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException; + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException; /** * Request short circuit access file descriptors from a DataNode. @@ -148,11 +161,13 @@ public interface DataTransferProtocol { * It is used for balancing purpose. * * @param blk the block being replaced. + * @param storageType the {@link StorageType} for storing the block. * @param blockToken security token for accessing the block. * @param delHint the hint for deleting the block in the original datanode. * @param source the source datanode for receiving the block. */ public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String delHint, final DatanodeInfo source) throws IOException; Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Tue Jul 22 08:23:34 2014 @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; @@ -121,10 +122,13 @@ public abstract class Receiver implement /** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); + final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - PBHelper.convert(proto.getTargetsList()), + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), PBHelper.convert(proto.getSource()), fromProto(proto.getStage()), proto.getPipelineSize(), @@ -140,10 +144,12 @@ public abstract class Receiver implement private void opTransferBlock(DataInputStream in) throws IOException { final OpTransferBlockProto proto = OpTransferBlockProto.parseFrom(vintPrefixed(in)); + final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - PBHelper.convert(proto.getTargetsList())); + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); } /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */ @@ -176,6 +182,7 @@ public abstract class Receiver implement private void opReplaceBlock(DataInputStream in) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in)); replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getToken()), proto.getDelHint(), PBHelper.convert(proto.getSource())); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Tue Jul 22 08:23:34 2014 @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; @@ -111,9 +112,11 @@ public class Sender implements DataTrans @Override public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String clientName, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -130,7 +133,9 @@ public class Sender implements DataTrans OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() .setHeader(header) + .setStorageType(PBHelper.convertStorageType(storageType)) .addAllTargets(PBHelper.convert(targets, 1)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1)) .setStage(toProto(stage)) .setPipelineSize(pipelineSize) .setMinBytesRcvd(minBytesRcvd) @@ -150,12 +155,14 @@ public class Sender implements DataTrans public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException { + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken)) .addAllTargets(PBHelper.convert(targets)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) .build(); send(out, Op.TRANSFER_BLOCK, proto); @@ -196,11 +203,13 @@ public class Sender implements DataTrans @Override public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String delHint, final DatanodeInfo source) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .setStorageType(PBHelper.convertStorageType(storageType)) .setDelHint(delHint) .setSource(PBHelper.convertDatanodeInfo(source)) .build(); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Jul 22 08:23:34 2014 @@ -150,6 +150,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto; @@ -674,14 +675,8 @@ public class PBHelper { targets[i] = PBHelper.convert(locs.get(i)); } - final int storageTypesCount = proto.getStorageTypesCount(); - final StorageType[] storageTypes; - if (storageTypesCount == 0) { - storageTypes = null; - } else { - Preconditions.checkState(storageTypesCount == locs.size()); - storageTypes = convertStorageTypeProtos(proto.getStorageTypesList()); - } + final StorageType[] storageTypes = convertStorageTypes( + proto.getStorageTypesList(), locs.size()); final int storageIDsCount = proto.getStorageIDsCount(); final String[] storageIDs; @@ -969,6 +964,20 @@ public class PBHelper { targets[i] = PBHelper.convert(targetList.get(i)); } + StorageType[][] targetStorageTypes = new StorageType[targetList.size()][]; + List targetStorageTypesList = blkCmd.getTargetStorageTypesList(); + if (targetStorageTypesList.isEmpty()) { // missing storage types + for(int i = 0; i < targetStorageTypes.length; i++) { + targetStorageTypes[i] = new StorageType[targets[i].length]; + Arrays.fill(targetStorageTypes[i], StorageType.DEFAULT); + } + } else { + for(int i = 0; i < targetStorageTypes.length; i++) { + List p = targetStorageTypesList.get(i).getStorageTypesList(); + targetStorageTypes[i] = p.toArray(new StorageType[p.size()]); + } + } + List targetStorageUuidsList = blkCmd.getTargetStorageUuidsList(); String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][]; for(int i = 0; i < targetStorageIDs.length; i++) { @@ -991,7 +1000,7 @@ public class PBHelper { throw new AssertionError("Unknown action type: " + blkCmd.getAction()); } return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets, - targetStorageIDs); + targetStorageTypes, targetStorageIDs); } public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) { @@ -1605,8 +1614,25 @@ public class PBHelper { } } - private static StorageTypeProto convertStorageType( - StorageType type) { + public static List convertStorageTypes( + StorageType[] types) { + return convertStorageTypes(types, 0); + } + + public static List convertStorageTypes( + StorageType[] types, int startIdx) { + if (types == null) { + return null; + } + final List protos = new ArrayList( + types.length); + for (int i = startIdx; i < types.length; ++i) { + protos.add(convertStorageType(types[i])); + } + return protos; + } + + public static StorageTypeProto convertStorageType(StorageType type) { switch(type) { case DISK: return StorageTypeProto.DISK; @@ -1623,7 +1649,7 @@ public class PBHelper { public static DatanodeStorage convert(DatanodeStorageProto s) { return new DatanodeStorage(s.getStorageUuid(), PBHelper.convertState(s.getState()), - PBHelper.convertType(s.getStorageType())); + PBHelper.convertStorageType(s.getStorageType())); } private static State convertState(StorageState state) { @@ -1636,7 +1662,7 @@ public class PBHelper { } } - private static StorageType convertType(StorageTypeProto type) { + public static StorageType convertStorageType(StorageTypeProto type) { switch(type) { case DISK: return StorageType.DISK; @@ -1650,11 +1676,16 @@ public class PBHelper { } } - private static StorageType[] convertStorageTypeProtos( - List storageTypesList) { - final StorageType[] storageTypes = new StorageType[storageTypesList.size()]; - for (int i = 0; i < storageTypes.length; ++i) { - storageTypes[i] = PBHelper.convertType(storageTypesList.get(i)); + public static StorageType[] convertStorageTypes( + List storageTypesList, int expectedSize) { + final StorageType[] storageTypes = new StorageType[expectedSize]; + if (storageTypesList.size() != expectedSize) { // missing storage types + Preconditions.checkState(storageTypesList.isEmpty()); + Arrays.fill(storageTypes, StorageType.DEFAULT); + } else { + for (int i = 0; i < storageTypes.length; ++i) { + storageTypes[i] = convertStorageType(storageTypesList.get(i)); + } } return storageTypes; } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Jul 22 08:23:34 2014 @@ -59,6 +59,7 @@ import org.apache.hadoop.conf.Configured import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -368,7 +369,7 @@ public class Balancer { in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); - sendRequest(out, eb, accessToken); + sendRequest(out, eb, StorageType.DEFAULT, accessToken); receiveResponse(in); bytesMoved.addAndGet(block.getNumBytes()); LOG.info("Successfully moved " + this); @@ -400,8 +401,9 @@ public class Balancer { /* Send a block replace request to the output stream*/ private void sendRequest(DataOutputStream out, ExtendedBlock eb, + StorageType storageType, Token accessToken) throws IOException { - new Sender(out).replaceBlock(eb, accessToken, + new Sender(out).replaceBlock(eb, storageType, accessToken, source.getStorageID(), proxySource.getDatanode()); } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Tue Jul 22 08:23:34 2014 @@ -575,7 +575,8 @@ class BPOfferService { switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode - dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); + dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), + bcmd.getTargets(), bcmd.getTargetStorageTypes()); dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_INVALIDATE: Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Jul 22 08:23:34 2014 @@ -37,6 +37,7 @@ import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSOutputSummer; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -122,7 +123,8 @@ class BlockReceiver implements Closeable private boolean syncOnClose; private long restartBudget; - BlockReceiver(final ExtendedBlock block, final DataInputStream in, + BlockReceiver(final ExtendedBlock block, final StorageType storageType, + final DataInputStream in, final String inAddr, final String myAddr, final BlockConstructionStage stage, final long newGs, final long minBytesRcvd, final long maxBytesRcvd, @@ -162,11 +164,11 @@ class BlockReceiver implements Closeable // Open local disk out // if (isDatanode) { //replication or move - replicaInfo = datanode.data.createTemporary(block); + replicaInfo = datanode.data.createTemporary(storageType, block); } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaInfo = datanode.data.createRbw(block); + replicaInfo = datanode.data.createRbw(storageType, block); datanode.notifyNamenodeReceivingBlock( block, replicaInfo.getStorageUuid()); break; @@ -198,7 +200,7 @@ class BlockReceiver implements Closeable case TRANSFER_RBW: case TRANSFER_FINALIZED: // this is a transfer destination - replicaInfo = datanode.data.createTemporary(block); + replicaInfo = datanode.data.createTemporary(storageType, block); break; default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Jul 22 08:23:34 2014 @@ -19,11 +19,66 @@ package org.apache.hadoop.hdfs.server.da import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; +import static org.apache.hadoop.util.ExitUtil.terminate; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.protobuf.BlockingService; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.channels.SocketChannel; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.management.ObjectName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,10 +94,23 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.net.DomainPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.hadoop.hdfs.protocol.datatransfer.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; @@ -50,9 +118,20 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; -import org.apache.hadoop.hdfs.protocolPB.*; -import org.apache.hadoop.hdfs.security.token.block.*; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -65,7 +144,11 @@ import org.apache.hadoop.hdfs.server.dat import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.http.HttpConfig; @@ -88,22 +171,21 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.*; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.JvmPauseMonitor; +import org.apache.hadoop.util.ServicePlugin; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.VersionInfo; import org.mortbay.util.ajax.JSON; -import javax.management.ObjectName; - -import java.io.*; -import java.lang.management.ManagementFactory; -import java.net.*; -import java.nio.channels.SocketChannel; -import java.security.PrivilegedExceptionAction; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; -import static org.apache.hadoop.util.ExitUtil.terminate; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; /********************************************************** * DataNode is a class (and program) that stores a set of @@ -1475,8 +1557,8 @@ public class DataNode extends Configured return xmitsInProgress.get(); } - private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[]) - throws IOException { + private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, + StorageType[] xferTargetStorageTypes) throws IOException { BPOfferService bpos = getBPOSForBlock(block); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); @@ -1512,16 +1594,17 @@ public class DataNode extends Configured LOG.info(bpReg + " Starting thread to transfer " + block + " to " + xfersBuilder); - new Daemon(new DataTransfer(xferTargets, block, + new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block, BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start(); } } void transferBlocks(String poolId, Block blocks[], - DatanodeInfo xferTargets[][]) { + DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) { for (int i = 0; i < blocks.length; i++) { try { - transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]); + transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i], + xferTargetStorageTypes[i]); } catch (IOException ie) { LOG.warn("Failed to transfer block " + blocks[i], ie); } @@ -1624,6 +1707,7 @@ public class DataNode extends Configured */ private class DataTransfer implements Runnable { final DatanodeInfo[] targets; + final StorageType[] targetStorageTypes; final ExtendedBlock b; final BlockConstructionStage stage; final private DatanodeRegistration bpReg; @@ -1634,7 +1718,8 @@ public class DataNode extends Configured * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ - DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage, + DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, + ExtendedBlock b, BlockConstructionStage stage, final String clientname) { if (DataTransferProtocol.LOG.isDebugEnabled()) { DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": " @@ -1644,6 +1729,7 @@ public class DataNode extends Configured + ", targests=" + Arrays.asList(targets)); } this.targets = targets; + this.targetStorageTypes = targetStorageTypes; this.b = b; this.stage = stage; BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); @@ -1702,7 +1788,8 @@ public class DataNode extends Configured false, false, true, DataNode.this, null, cachingStrategy); DatanodeInfo srcNode = new DatanodeInfo(bpReg); - new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, + new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, + clientname, targets, targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy); // send data & checksum @@ -2403,7 +2490,8 @@ public class DataNode extends Configured * @param client client name */ void transferReplicaForPipelineRecovery(final ExtendedBlock b, - final DatanodeInfo[] targets, final String client) throws IOException { + final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, + final String client) throws IOException { final long storedGS; final long visible; final BlockConstructionStage stage; @@ -2436,7 +2524,7 @@ public class DataNode extends Configured b.setNumBytes(visible); if (targets.length > 0) { - new DataTransfer(targets, b, stage, client).run(); + new DataTransfer(targets, targetStorageTypes, b, stage, client).run(); } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Jul 22 08:23:34 2014 @@ -45,6 +45,7 @@ import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -524,9 +525,11 @@ class DataXceiver extends Receiver imple @Override public void writeBlock(final ExtendedBlock block, + final StorageType storageType, final Token blockToken, final String clientname, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, @@ -590,12 +593,13 @@ class DataXceiver extends Receiver imple if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver - blockReceiver = new BlockReceiver(block, in, + blockReceiver = new BlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy); + storageUuid = blockReceiver.getStorageUuid(); } else { storageUuid = datanode.data.recoverClose( @@ -636,10 +640,10 @@ class DataXceiver extends Receiver imple HdfsConstants.SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(unbufMirrorIn); - new Sender(mirrorOut).writeBlock(originalBlock, blockToken, - clientname, targets, srcDataNode, stage, pipelineSize, - minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, - cachingStrategy); + new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], + blockToken, clientname, targets, targetStorageTypes, srcDataNode, + stage, pipelineSize, minBytesRcvd, maxBytesRcvd, + latestGenerationStamp, requestedChecksum, cachingStrategy); mirrorOut.flush(); @@ -754,7 +758,8 @@ class DataXceiver extends Receiver imple public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException { + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; @@ -763,7 +768,8 @@ class DataXceiver extends Receiver imple final DataOutputStream out = new DataOutputStream( getOutputStream()); try { - datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); + datanode.transferReplicaForPipelineRecovery(blk, targets, + targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); } finally { IOUtils.closeStream(out); @@ -941,6 +947,7 @@ class DataXceiver extends Receiver imple @Override public void replaceBlock(final ExtendedBlock block, + final StorageType storageType, final Token blockToken, final String delHint, final DatanodeInfo proxySource) throws IOException { @@ -1026,8 +1033,8 @@ class DataXceiver extends Receiver imple DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist - blockReceiver = new BlockReceiver( - block, proxyReply, proxySock.getRemoteSocketAddress().toString(), + blockReceiver = new BlockReceiver(block, storageType, + proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, CachingStrategy.newDropBehind()); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Tue Jul 22 08:23:34 2014 @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -176,8 +177,8 @@ public interface FsDatasetSpi list = new ArrayList(volumes.size()); + for(FsVolumeImpl v : volumes) { + if (v.getStorageType() == storageType) { + list.add(v); + } + } + return blockChooser.chooseVolume(list, blockSize); } long getDfsUsed() throws IOException { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Tue Jul 22 08:23:34 2014 @@ -21,6 +21,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; @@ -50,6 +51,7 @@ public class BlockCommand extends Datano final String poolId; final Block[] blocks; final DatanodeInfo[][] targets; + final StorageType[][] targetStorageTypes; final String[][] targetStorageIDs; /** @@ -62,17 +64,20 @@ public class BlockCommand extends Datano this.poolId = poolId; blocks = new Block[blocktargetlist.size()]; targets = new DatanodeInfo[blocks.length][]; + targetStorageTypes = new StorageType[blocks.length][]; targetStorageIDs = new String[blocks.length][]; for(int i = 0; i < blocks.length; i++) { BlockTargetPair p = blocktargetlist.get(i); blocks[i] = p.block; targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets); + targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets); targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets); } } private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {}; + private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {}; private static final String[][] EMPTY_TARGET_STORAGEIDS = {}; /** @@ -81,7 +86,7 @@ public class BlockCommand extends Datano */ public BlockCommand(int action, String poolId, Block blocks[]) { this(action, poolId, blocks, EMPTY_TARGET_DATANODES, - EMPTY_TARGET_STORAGEIDS); + EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS); } /** @@ -89,11 +94,13 @@ public class BlockCommand extends Datano * @param blocks blocks related to the action */ public BlockCommand(int action, String poolId, Block[] blocks, - DatanodeInfo[][] targets, String[][] targetStorageIDs) { + DatanodeInfo[][] targets, StorageType[][] targetStorageTypes, + String[][] targetStorageIDs) { super(action); this.poolId = poolId; this.blocks = blocks; this.targets = targets; + this.targetStorageTypes = targetStorageTypes; this.targetStorageIDs = targetStorageIDs; } @@ -109,6 +116,10 @@ public class BlockCommand extends Datano return targets; } + public StorageType[][] getTargetStorageTypes() { + return targetStorageTypes; + } + public String[][] getTargetStorageIDs() { return targetStorageIDs; } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto Tue Jul 22 08:23:34 2014 @@ -113,6 +113,7 @@ message BlockCommandProto { repeated BlockProto blocks = 3; repeated DatanodeInfosProto targets = 4; repeated StorageUuidsProto targetStorageUuids = 5; + repeated StorageTypesProto targetStorageTypes = 6; } /** Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Tue Jul 22 08:23:34 2014 @@ -107,17 +107,21 @@ message OpWriteBlockProto { */ required ChecksumProto requestedChecksum = 9; optional CachingStrategyProto cachingStrategy = 10; + optional StorageTypeProto storageType = 11 [default = DISK]; + repeated StorageTypeProto targetStorageTypes = 12; } message OpTransferBlockProto { required ClientOperationHeaderProto header = 1; repeated DatanodeInfoProto targets = 2; + repeated StorageTypeProto targetStorageTypes = 3; } message OpReplaceBlockProto { required BaseHeaderProto header = 1; required string delHint = 2; required DatanodeInfoProto source = 3; + optional StorageTypeProto storageType = 4 [default = DISK]; } message OpCopyBlockProto { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Tue Jul 22 08:23:34 2014 @@ -138,6 +138,13 @@ enum StorageTypeProto { } /** + * A list of storage types. + */ +message StorageTypesProto { + repeated StorageTypeProto storageTypes = 1; +} + +/** * A list of storage IDs. */ message StorageUuidsProto { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Tue Jul 22 08:23:34 2014 @@ -380,7 +380,7 @@ public class DFSTestUtil { */ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b, int racks, int replicas, int neededReplicas) - throws IOException, TimeoutException, InterruptedException { + throws TimeoutException, InterruptedException { int curRacks = 0; int curReplicas = 0; int curNeededReplicas = 0; @@ -414,7 +414,7 @@ public class DFSTestUtil { */ public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns, Path file, ExtendedBlock b, int corruptRepls) - throws IOException, TimeoutException, InterruptedException { + throws TimeoutException, InterruptedException { int count = 0; final int ATTEMPTS = 50; int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock()); @@ -839,7 +839,8 @@ public class DFSTestUtil { // send the request new Sender(out).transferBlock(b, new Token(), - dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}); + dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, + new StorageType[]{StorageType.DEFAULT}); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Jul 22 08:23:34 2014 @@ -125,17 +125,16 @@ public class TestDataTransferProtocol { throw eof; } - LOG.info("Received: " +new String(retBuf)); - LOG.info("Expected: " + StringUtils.byteToHexString(recvBuf.toByteArray())); + String received = StringUtils.byteToHexString(retBuf); + String expected = StringUtils.byteToHexString(recvBuf.toByteArray()); + LOG.info("Received: " + received); + LOG.info("Expected: " + expected); if (eofExpected) { throw new IOException("Did not recieve IOException when an exception " + "is expected while reading from " + datanode); } - - byte[] needed = recvBuf.toByteArray(); - assertEquals(StringUtils.byteToHexString(needed), - StringUtils.byteToHexString(retBuf)); + assertEquals(expected, received); } finally { IOUtils.closeSocket(sock); } @@ -184,10 +183,7 @@ public class TestDataTransferProtocol { String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); - sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, stage, - 0, block.getNumBytes(), block.getNumBytes(), newGS, - DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); + writeBlock(block, stage, newGS, DEFAULT_CHECKSUM); if (eofExcepted) { sendResponse(Status.ERROR, null, null, recvOut); sendRecvData(description, true); @@ -343,10 +339,7 @@ public class TestDataTransferProtocol { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); try { cluster.waitActive(); - DFSClient dfsClient = new DFSClient( - new InetSocketAddress("localhost", cluster.getNameNodePort()), - conf); - datanode = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0]; + datanode = cluster.getFileSystem().getDataNodeStats(DatanodeReportType.LIVE)[0]; dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); FileSystem fileSys = cluster.getFileSystem(); @@ -381,23 +374,14 @@ public class TestDataTransferProtocol { DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM); Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum(); - sender.writeBlock(new ExtendedBlock(poolId, newBlockId), - BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, - 0, 0L, 0L, 0L, - badChecksum, CachingStrategy.newDefaultStrategy()); + writeBlock(poolId, newBlockId, badChecksum); recvBuf.reset(); sendResponse(Status.ERROR, null, null, recvOut); sendRecvData("wrong bytesPerChecksum while writing", true); sendBuf.reset(); recvBuf.reset(); - sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), - BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L, - DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); + writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM); PacketHeader hdr = new PacketHeader( 4, // size of packet @@ -416,11 +400,7 @@ public class TestDataTransferProtocol { // test for writing a valid zero size block sendBuf.reset(); recvBuf.reset(); - sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), - BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L, - DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); + writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM); hdr = new PacketHeader( 8, // size of packet @@ -532,4 +512,18 @@ public class TestDataTransferProtocol { assertTrue(hdr.sanityCheck(99)); assertFalse(hdr.sanityCheck(100)); } + + void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException { + writeBlock(new ExtendedBlock(poolId, blockId), + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum); + } + + void writeBlock(ExtendedBlock block, BlockConstructionStage stage, + long newGS, DataChecksum checksum) throws IOException { + sender.writeBlock(block, StorageType.DEFAULT, + BlockTokenSecretManager.DUMMY_TOKEN, "cl", + new DatanodeInfo[1], new StorageType[1], null, stage, + 0, block.getNumBytes(), block.getNumBytes(), newGS, + checksum, CachingStrategy.newDefaultStrategy()); + } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Tue Jul 22 08:23:34 2014 @@ -550,8 +550,10 @@ public class TestPBHelper { dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo(); dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo(); String[][] storageIDs = {{"s00"}, {"s10", "s11"}}; + StorageType[][] storageTypes = {{StorageType.DEFAULT}, + {StorageType.DEFAULT, StorageType.DEFAULT}}; BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1", - blocks, dnInfos, storageIDs); + blocks, dnInfos, storageTypes, storageIDs); BlockCommandProto bcProto = PBHelper.convert(bc); BlockCommand bc2 = PBHelper.convert(bcProto); assertEquals(bc.getAction(), bc2.getAction()); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java Tue Jul 22 08:23:34 2014 @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -324,7 +325,7 @@ public abstract class BlockReportTestBas public void blockReport_03() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); - ArrayList blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); + writeFile(METHOD_NAME, FILE_SIZE, filePath); // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N0); @@ -363,7 +364,7 @@ public abstract class BlockReportTestBas // Create a bogus new block which will not be present on the namenode. ExtendedBlock b = new ExtendedBlock( poolId, rand.nextLong(), 1024L, rand.nextLong()); - dn.getFSDataset().createRbw(b); + dn.getFSDataset().createRbw(StorageType.DEFAULT, b); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Jul 22 08:23:34 2014 @@ -744,14 +744,14 @@ public class SimulatedFSDataset implemen } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) - throws IOException { - return createTemporary(b); + public synchronized ReplicaInPipelineInterface createRbw( + StorageType storageType, ExtendedBlock b) throws IOException { + return createTemporary(storageType, b); } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipelineInterface createTemporary( + StorageType storageType, ExtendedBlock b) throws IOException { if (isValidBlock(b)) { throw new ReplicaAlreadyExistsException("Block " + b + " is valid, and cannot be written to."); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java Tue Jul 22 08:23:34 2014 @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.Distribute import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -531,7 +532,7 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - dn.data.createRbw(block); + dn.data.createRbw(StorageType.DEFAULT, block); try { dn.syncBlock(rBlock, initBlockRecords(dn)); fail("Sync should fail"); @@ -554,7 +555,8 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block); + ReplicaInPipelineInterface replicaInfo = dn.data.createRbw( + StorageType.DEFAULT, block); ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1612505&r1=1612504&r2=1612505&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Tue Jul 22 08:23:34 2014 @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; @@ -264,7 +265,8 @@ public class TestBlockReplacement { sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); - new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, + new Sender(out).replaceBlock(block, StorageType.DEFAULT, + BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), sourceProxy); out.flush(); // receiveResponse