Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 2122 invoked from network); 23 Jun 2009 07:19:14 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 23 Jun 2009 07:19:14 -0000 Received: (qmail 73607 invoked by uid 500); 23 Jun 2009 07:19:25 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 73588 invoked by uid 500); 23 Jun 2009 07:19:25 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Delivered-To: moderator for hdfs-commits@hadoop.apache.org Received: (qmail 59092 invoked by uid 99); 23 Jun 2009 04:26:00 -0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r787537 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoop/hdfs/server/datanode/ Date: Tue, 23 Jun 2009 04:25:28 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090623042528.591C22388877@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: szetszwo Date: Tue Jun 23 04:25:27 2009 New Revision: 787537 URL: http://svn.apache.org/viewvc?rev=787537&view=rev Log: HDFS-377. Separate codes which implement DataTransferProtocol. Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=787537&r1=787536&r2=787537&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Tue Jun 23 04:25:27 2009 @@ -14,3 +14,6 @@ HADOOP-4687. HDFS is split from Hadoop Core. It is a subproject under Hadoop (Owen O'Malley) + + HDFS-377. Separate codes which implement DataTransferProtocol. + (szetszwo) Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=787537&r1=787536&r2=787537&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jun 23 04:25:27 2009 @@ -616,13 +616,9 @@ + DataTransferProtocol.OP_BLOCK_CHECKSUM + ", block=" + block); } - out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); - out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM); - out.writeLong(block.getBlockId()); - out.writeLong(block.getGenerationStamp()); - lb.getAccessToken().write(out); - out.flush(); - + DataTransferProtocol.Sender.opBlockChecksum(out, block.getBlockId(), + block.getGenerationStamp(), lb.getAccessToken()); + final short reply = in.readShort(); if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) { if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN @@ -1307,19 +1303,10 @@ String clientName) throws IOException { // in and out will be closed when sock is closed (by the caller) - DataOutputStream out = new DataOutputStream( - new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))); - - //write the header. - out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); - out.write( DataTransferProtocol.OP_READ_BLOCK ); - out.writeLong( blockId ); - out.writeLong( genStamp ); - out.writeLong( startOffset ); - out.writeLong( len ); - Text.writeString(out, clientName); - accessToken.write(out); - out.flush(); + DataTransferProtocol.Sender.opReadBlock( + new DataOutputStream(new BufferedOutputStream( + NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))), + blockId, genStamp, startOffset, len, clientName, accessToken); // // Get bytes in block, set streams @@ -2731,19 +2718,9 @@ DataNode.SMALL_BUFFER_SIZE)); blockReplyStream = new DataInputStream(NetUtils.getInputStream(s)); - out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); - out.write(DataTransferProtocol.OP_WRITE_BLOCK); - out.writeLong(block.getBlockId()); - out.writeLong(block.getGenerationStamp()); - out.writeInt(nodes.length); - out.writeBoolean(recoveryFlag); // recovery flag - Text.writeString(out, client); - out.writeBoolean(false); // Not sending src node information - out.writeInt(nodes.length - 1); - for (int i = 1; i < nodes.length; i++) { - nodes[i].write(out); - } - accessToken.write(out); + DataTransferProtocol.Sender.opWriteBlock(out, + block.getBlockId(), block.getGenerationStamp(), nodes.length, + recoveryFlag, client, null, nodes, accessToken); checksum.writeHeader(out); out.flush(); Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=787537&r1=787536&r2=787537&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue Jun 23 04:25:27 2009 @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hdfs.protocol; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.AccessToken; /** - * - * The Client transfers data to/from datanode using a streaming protocol. - * + * Transfer data to/from datanode using a streaming protocol. */ public interface DataTransferProtocol { @@ -57,5 +61,196 @@ public static final int OP_STATUS_CHECKSUM_OK = 6; - + /** Sender */ + public static class Sender { + /** Initialize a operation. */ + public static void op(DataOutputStream out, int op) throws IOException { + out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); + out.write(op); + } + + /** Send OP_READ_BLOCK */ + public static void opReadBlock(DataOutputStream out, + long blockId, long blockGs, long blockOffset, long blockLen, + String clientName, AccessToken accessToken) throws IOException { + op(out, OP_READ_BLOCK); + + out.writeLong(blockId); + out.writeLong(blockGs); + out.writeLong(blockOffset); + out.writeLong(blockLen); + Text.writeString(out, clientName); + accessToken.write(out); + out.flush(); + } + + /** Send OP_WRITE_BLOCK */ + public static void opWriteBlock(DataOutputStream out, + long blockId, long blockGs, int pipelineSize, boolean isRecovery, + String client, DatanodeInfo src, DatanodeInfo[] targets, + AccessToken accesstoken) throws IOException { + op(out, OP_WRITE_BLOCK); + + out.writeLong(blockId); + out.writeLong(blockGs); + out.writeInt(pipelineSize); + out.writeBoolean(isRecovery); + Text.writeString(out, client); + + out.writeBoolean(src != null); + if (src != null) { + src.write(out); + } + out.writeInt(targets.length - 1); + for (int i = 1; i < targets.length; i++) { + targets[i].write(out); + } + + accesstoken.write(out); + } + + /** Send OP_REPLACE_BLOCK */ + public static void opReplaceBlock(DataOutputStream out, + long blockId, long blockGs, String storageId, DatanodeInfo src, + AccessToken accesstoken) throws IOException { + op(out, OP_REPLACE_BLOCK); + + out.writeLong(blockId); + out.writeLong(blockGs); + Text.writeString(out, storageId); + src.write(out); + accesstoken.write(out); + out.flush(); + } + + /** Send OP_COPY_BLOCK */ + public static void opCopyBlock(DataOutputStream out, + long blockId, long blockGs, AccessToken accesstoken) throws IOException { + op(out, OP_COPY_BLOCK); + + out.writeLong(blockId); + out.writeLong(blockGs); + accesstoken.write(out); + out.flush(); + } + + /** Send OP_BLOCK_CHECKSUM */ + public static void opBlockChecksum(DataOutputStream out, + long blockId, long blockGs, AccessToken accesstoken) throws IOException { + op(out, OP_BLOCK_CHECKSUM); + + out.writeLong(blockId); + out.writeLong(blockGs); + accesstoken.write(out); + out.flush(); + } + } + + /** Receiver */ + public static abstract class Receiver { + /** Initialize a operation. */ + public final byte op(DataInputStream in) throws IOException { + final short version = in.readShort(); + if (version != DATA_TRANSFER_VERSION) { + throw new IOException( "Version Mismatch" ); + } + return in.readByte(); + } + + /** Receive OP_READ_BLOCK */ + public final void opReadBlock(DataInputStream in) throws IOException { + final long blockId = in.readLong(); + final long blockGs = in.readLong(); + final long offset = in.readLong(); + final long length = in.readLong(); + final String client = Text.readString(in); + final AccessToken accesstoken = readAccessToken(in); + + opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken); + } + + /** Abstract OP_READ_BLOCK method. */ + public abstract void opReadBlock(DataInputStream in, + long blockId, long blockGs, long offset, long length, + String client, AccessToken accesstoken) throws IOException; + + /** Receive OP_WRITE_BLOCK */ + public final void opWriteBlock(DataInputStream in) throws IOException { + final long blockId = in.readLong(); + final long blockGs = in.readLong(); + final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline + final boolean isRecovery = in.readBoolean(); // is this part of recovery? + final String client = Text.readString(in); // working on behalf of this client + final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null; + + final int nTargets = in.readInt(); + if (nTargets < 0) { + throw new IOException("Mislabelled incoming datastream."); + } + final DatanodeInfo targets[] = new DatanodeInfo[nTargets]; + for (int i = 0; i < targets.length; i++) { + targets[i] = DatanodeInfo.read(in); + } + final AccessToken accesstoken = readAccessToken(in); + + opWriteBlock(in, blockId, blockGs, pipelineSize, isRecovery, + client, src, targets, accesstoken); + } + + /** Abstract OP_WRITE_BLOCK method. */ + public abstract void opWriteBlock(DataInputStream in, + long blockId, long blockGs, int pipelineSize, boolean isRecovery, + String client, DatanodeInfo src, DatanodeInfo[] targets, + AccessToken accesstoken) throws IOException; + + /** Receive OP_REPLACE_BLOCK */ + public final void opReplaceBlock(DataInputStream in) throws IOException { + final long blockId = in.readLong(); + final long blockGs = in.readLong(); + final String sourceId = Text.readString(in); // read del hint + final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source + final AccessToken accesstoken = readAccessToken(in); + + opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken); + } + + /** Abstract OP_REPLACE_BLOCK method. */ + public abstract void opReplaceBlock(DataInputStream in, + long blockId, long blockGs, String sourceId, DatanodeInfo src, + AccessToken accesstoken) throws IOException; + + /** Receive OP_COPY_BLOCK */ + public final void opCopyBlock(DataInputStream in) throws IOException { + final long blockId = in.readLong(); + final long blockGs = in.readLong(); + final AccessToken accesstoken = readAccessToken(in); + + opCopyBlock(in, blockId, blockGs, accesstoken); + } + + /** Abstract OP_COPY_BLOCK method. */ + public abstract void opCopyBlock(DataInputStream in, + long blockId, long blockGs, AccessToken accesstoken) throws IOException; + + /** Receive OP_BLOCK_CHECKSUM */ + public final void opBlockChecksum(DataInputStream in) throws IOException { + final long blockId = in.readLong(); + final long blockGs = in.readLong(); + final AccessToken accesstoken = readAccessToken(in); + + opBlockChecksum(in, blockId, blockGs, accesstoken); + } + + /** Abstract OP_BLOCK_CHECKSUM method. */ + public abstract void opBlockChecksum(DataInputStream in, + long blockId, long blockGs, AccessToken accesstoken) throws IOException; + + /** Read an AccessToken */ + static private AccessToken readAccessToken(DataInputStream in + ) throws IOException { + final AccessToken t = new AccessToken(); + t.readFields(in); + return t; + } + } } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=787537&r1=787536&r2=787537&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Jun 23 04:25:27 2009 @@ -343,6 +343,13 @@ setAdminState(WritableUtils.readEnum(in, AdminStates.class)); } + /** Read a DatanodeInfo */ + public static DatanodeInfo read(DataInput in) throws IOException { + final DatanodeInfo d = new DatanodeInfo(); + d.readFields(in); + return d; + } + @Override public int hashCode() { // Super implementation is sufficient Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=787537&r1=787536&r2=787537&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Jun 23 04:25:27 2009 @@ -19,9 +19,7 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; -import java.io.DataInput; import java.io.DataInputStream; -import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -67,8 +65,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; @@ -367,20 +363,15 @@ /* Send a block replace request to the output stream*/ private void sendRequest(DataOutputStream out) throws IOException { - out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); - out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); - out.writeLong(block.getBlock().getBlockId()); - out.writeLong(block.getBlock().getGenerationStamp()); - Text.writeString(out, source.getStorageID()); - proxySource.getDatanode().write(out); AccessToken accessToken = AccessToken.DUMMY_TOKEN; if (isAccessTokenEnabled) { accessToken = accessTokenHandler.generateToken(null, block.getBlock() .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE, AccessTokenHandler.AccessMode.COPY)); } - accessToken.write(out); - out.flush(); + DataTransferProtocol.Sender.opReplaceBlock(out, + block.getBlock().getBlockId(), block.getBlock().getGenerationStamp(), + source.getStorageID(), proxySource.getDatanode(), accessToken); } /* Receive a block copy response from the input stream */ Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=787537&r1=787536&r2=787537&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Jun 23 04:25:27 2009 @@ -56,11 +56,11 @@ import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; -import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; -import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets; @@ -72,13 +72,12 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; -import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; @@ -94,8 +93,8 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; @@ -1216,26 +1215,15 @@ // // Header info // - out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); - out.writeByte(DataTransferProtocol.OP_WRITE_BLOCK); - out.writeLong(b.getBlockId()); - out.writeLong(b.getGenerationStamp()); - out.writeInt(0); // no pipelining - out.writeBoolean(false); // not part of recovery - Text.writeString(out, ""); // client - out.writeBoolean(true); // sending src node information - srcNode.write(out); // Write src node DatanodeInfo - // write targets - out.writeInt(targets.length - 1); - for (int i = 1; i < targets.length; i++) { - targets[i].write(out); - } AccessToken accessToken = AccessToken.DUMMY_TOKEN; if (isAccessTokenEnabled) { accessToken = accessTokenHandler.generateToken(null, b.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)); } - accessToken.write(out); + DataTransferProtocol.Sender.opWriteBlock(out, + b.getBlockId(), b.getGenerationStamp(), 0, false, "", + srcNode, targets, accessToken); + // send data & checksum blockSender.sendBlock(out, baseStream, null); Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=787537&r1=787536&r2=787537&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Jun 23 04:25:27 2009 @@ -47,7 +47,8 @@ /** * Thread for processing incoming/outgoing data stream. */ -class DataXceiver implements Runnable, FSConstants { +class DataXceiver extends DataTransferProtocol.Receiver + implements Runnable, FSConstants { public static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; @@ -78,12 +79,8 @@ in = new DataInputStream( new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE)); - short version = in.readShort(); - if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) { - throw new IOException( "Version Mismatch" ); - } + final byte op = op(in); boolean local = s.getInetAddress().equals(s.getLocalAddress()); - byte op = in.readByte(); // Make sure the xciver count is not exceeded int curXceiverCount = datanode.getXceiverCount(); if (curXceiverCount > dataXceiverServer.maxXceiverCount) { @@ -94,7 +91,7 @@ long startTime = DataNode.now(); switch ( op ) { case DataTransferProtocol.OP_READ_BLOCK: - readBlock( in ); + opReadBlock(in); datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime); if (local) datanode.myMetrics.readsFromLocalClient.inc(); @@ -102,7 +99,7 @@ datanode.myMetrics.readsFromRemoteClient.inc(); break; case DataTransferProtocol.OP_WRITE_BLOCK: - writeBlock( in ); + opWriteBlock(in); datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime); if (local) datanode.myMetrics.writesFromLocalClient.inc(); @@ -110,16 +107,16 @@ datanode.myMetrics.writesFromRemoteClient.inc(); break; case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination - replaceBlock(in); + opReplaceBlock(in); datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime); break; case DataTransferProtocol.OP_COPY_BLOCK: // for balancing purpose; send to a proxy source - copyBlock(in); + opCopyBlock(in); datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime); break; case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block - getBlockChecksum(in); + opBlockChecksum(in); datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime); break; default: @@ -138,21 +135,12 @@ /** * Read a block from the disk. - * @param in The stream to read from - * @throws IOException */ - private void readBlock(DataInputStream in) throws IOException { - // - // Read in the header - // - long blockId = in.readLong(); - Block block = new Block( blockId, 0 , in.readLong()); - - long startOffset = in.readLong(); - long length = in.readLong(); - String clientName = Text.readString(in); - AccessToken accessToken = new AccessToken(); - accessToken.readFields(in); + @Override + public void opReadBlock(DataInputStream in, + long blockId, long blockGs, long startOffset, long length, + String clientName, AccessToken accessToken) throws IOException { + final Block block = new Block(blockId, 0 , blockGs); OutputStream baseStream = NetUtils.getOutputStream(s, datanode.socketWriteTimeout); DataOutputStream out = new DataOutputStream( @@ -224,42 +212,24 @@ /** * Write a block to disk. - * - * @param in The stream to read from - * @throws IOException */ - private void writeBlock(DataInputStream in) throws IOException { - DatanodeInfo srcDataNode = null; - LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() + - " tcp no delay " + s.getTcpNoDelay()); - // - // Read in the header - // - Block block = new Block(in.readLong(), - dataXceiverServer.estimateBlockSize, in.readLong()); + @Override + public void opWriteBlock(DataInputStream in, long blockId, long blockGs, + int pipelineSize, boolean isRecovery, + String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets, + AccessToken accessToken) throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() + + " tcp no delay " + s.getTcpNoDelay()); + } + + final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize, + blockGs); LOG.info("Receiving block " + block + " src: " + remoteAddress + " dest: " + localAddress); - int pipelineSize = in.readInt(); // num of datanodes in entire pipeline - boolean isRecovery = in.readBoolean(); // is this part of recovery? - String client = Text.readString(in); // working on behalf of this client - boolean hasSrcDataNode = in.readBoolean(); // is src node info present - if (hasSrcDataNode) { - srcDataNode = new DatanodeInfo(); - srcDataNode.readFields(in); - } - int numTargets = in.readInt(); - if (numTargets < 0) { - throw new IOException("Mislabelled incoming datastream."); - } - DatanodeInfo targets[] = new DatanodeInfo[numTargets]; - for (int i = 0; i < targets.length; i++) { - DatanodeInfo tmp = new DatanodeInfo(); - tmp.readFields(in); - targets[i] = tmp; - } - AccessToken accessToken = new AccessToken(); - accessToken.readFields(in); + DataOutputStream replyOut = null; // stream to prev target replyOut = new DataOutputStream( NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); @@ -302,9 +272,9 @@ mirrorTarget = NetUtils.createSocketAddr(mirrorNode); mirrorSock = datanode.newSocket(); try { - int timeoutValue = numTargets * datanode.socketTimeout; + int timeoutValue = targets.length * datanode.socketTimeout; int writeTimeout = datanode.socketWriteTimeout + - (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets); + (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length); NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE); @@ -315,22 +285,9 @@ mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock)); // Write header: Copied from DFSClient.java! - mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); - mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK ); - mirrorOut.writeLong( block.getBlockId() ); - mirrorOut.writeLong( block.getGenerationStamp() ); - mirrorOut.writeInt( pipelineSize ); - mirrorOut.writeBoolean( isRecovery ); - Text.writeString( mirrorOut, client ); - mirrorOut.writeBoolean(hasSrcDataNode); - if (hasSrcDataNode) { // pass src node information - srcDataNode.write(mirrorOut); - } - mirrorOut.writeInt( targets.length - 1 ); - for ( int i = 1; i < targets.length; i++ ) { - targets[i].write( mirrorOut ); - } - accessToken.write(mirrorOut); + DataTransferProtocol.Sender.opWriteBlock(mirrorOut, + block.getBlockId(), block.getGenerationStamp(), pipelineSize, + isRecovery, client, srcDataNode, targets, accessToken); blockReceiver.writeChecksumHeader(mirrorOut); mirrorOut.flush(); @@ -414,12 +371,11 @@ /** * Get block checksum (MD5 of CRC32). - * @param in */ - void getBlockChecksum(DataInputStream in) throws IOException { - final Block block = new Block(in.readLong(), 0 , in.readLong()); - AccessToken accessToken = new AccessToken(); - accessToken.readFields(in); + @Override + public void opBlockChecksum(DataInputStream in, + long blockId, long blockGs, AccessToken accessToken) throws IOException { + final Block block = new Block(blockId, 0 , blockGs); DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); if (datanode.isAccessTokenEnabled @@ -471,16 +427,12 @@ /** * Read a block from the disk and then sends it to a destination. - * - * @param in The stream to read from - * @throws IOException */ - private void copyBlock(DataInputStream in) throws IOException { + @Override + public void opCopyBlock(DataInputStream in, + long blockId, long blockGs, AccessToken accessToken) throws IOException { // Read in the header - long blockId = in.readLong(); // read block id - Block block = new Block(blockId, 0, in.readLong()); - AccessToken accessToken = new AccessToken(); - accessToken.readFields(in); + Block block = new Block(blockId, 0, blockGs); if (datanode.isAccessTokenEnabled && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId, AccessTokenHandler.AccessMode.COPY)) { @@ -545,20 +497,14 @@ /** * Receive a block and write it to disk, it then notifies the namenode to * remove the copy from the source. - * - * @param in The stream to read from - * @throws IOException */ - private void replaceBlock(DataInputStream in) throws IOException { + @Override + public void opReplaceBlock(DataInputStream in, + long blockId, long blockGs, String sourceID, DatanodeInfo proxySource, + AccessToken accessToken) throws IOException { /* read header */ - long blockId = in.readLong(); - Block block = new Block(blockId, dataXceiverServer.estimateBlockSize, - in.readLong()); // block id & generation stamp - String sourceID = Text.readString(in); // read del hint - DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source - proxySource.readFields(in); - AccessToken accessToken = new AccessToken(); - accessToken.readFields(in); + final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize, + blockGs); if (datanode.isAccessTokenEnabled && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId, AccessTokenHandler.AccessMode.REPLACE)) { @@ -597,12 +543,8 @@ new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE)); /* send request to the proxy */ - proxyOut.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); // transfer version - proxyOut.writeByte(DataTransferProtocol.OP_COPY_BLOCK); // op code - proxyOut.writeLong(block.getBlockId()); // block id - proxyOut.writeLong(block.getGenerationStamp()); // block id - accessToken.write(proxyOut); - proxyOut.flush(); + DataTransferProtocol.Sender.opCopyBlock(proxyOut, block.getBlockId(), + block.getGenerationStamp(), accessToken); // receive the response from the proxy proxyReply = new DataInputStream(new BufferedInputStream(