Author: szetszwo
Date: Tue Jun 23 04:25:27 2009
New Revision: 787537
URL: http://svn.apache.org/viewvc?rev=787537&view=rev
Log:
HDFS-377. Separate codes which implement DataTransferProtocol.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=787537&r1=787536&r2=787537&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Jun 23 04:25:27 2009
@@ -14,3 +14,6 @@
HADOOP-4687. HDFS is split from Hadoop Core. It is a subproject under
Hadoop (Owen O'Malley)
+
+ HDFS-377. Separate codes which implement DataTransferProtocol.
+ (szetszwo)
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=787537&r1=787536&r2=787537&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jun 23 04:25:27 2009
@@ -616,13 +616,9 @@
+ DataTransferProtocol.OP_BLOCK_CHECKSUM +
", block=" + block);
}
- out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM);
- out.writeLong(block.getBlockId());
- out.writeLong(block.getGenerationStamp());
- lb.getAccessToken().write(out);
- out.flush();
-
+ DataTransferProtocol.Sender.opBlockChecksum(out, block.getBlockId(),
+ block.getGenerationStamp(), lb.getAccessToken());
+
final short reply = in.readShort();
if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN
@@ -1307,19 +1303,10 @@
String clientName)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
- DataOutputStream out = new DataOutputStream(
- new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
-
- //write the header.
- out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
- out.write( DataTransferProtocol.OP_READ_BLOCK );
- out.writeLong( blockId );
- out.writeLong( genStamp );
- out.writeLong( startOffset );
- out.writeLong( len );
- Text.writeString(out, clientName);
- accessToken.write(out);
- out.flush();
+ DataTransferProtocol.Sender.opReadBlock(
+ new DataOutputStream(new BufferedOutputStream(
+ NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
+ blockId, genStamp, startOffset, len, clientName, accessToken);
//
// Get bytes in block, set streams
@@ -2731,19 +2718,9 @@
DataNode.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
- out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- out.write(DataTransferProtocol.OP_WRITE_BLOCK);
- out.writeLong(block.getBlockId());
- out.writeLong(block.getGenerationStamp());
- out.writeInt(nodes.length);
- out.writeBoolean(recoveryFlag); // recovery flag
- Text.writeString(out, client);
- out.writeBoolean(false); // Not sending src node information
- out.writeInt(nodes.length - 1);
- for (int i = 1; i < nodes.length; i++) {
- nodes[i].write(out);
- }
- accessToken.write(out);
+ DataTransferProtocol.Sender.opWriteBlock(out,
+ block.getBlockId(), block.getGenerationStamp(), nodes.length,
+ recoveryFlag, client, null, nodes, accessToken);
checksum.writeHeader(out);
out.flush();
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=787537&r1=787536&r2=787537&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue
Jun 23 04:25:27 2009
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hdfs.protocol;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.AccessToken;
/**
- *
- * The Client transfers data to/from datanode using a streaming protocol.
- *
+ * Transfer data to/from datanode using a streaming protocol.
*/
public interface DataTransferProtocol {
@@ -57,5 +61,196 @@
public static final int OP_STATUS_CHECKSUM_OK = 6;
-
+ /** Sender */
+ public static class Sender {
+ /** Initialize a operation. */
+ public static void op(DataOutputStream out, int op) throws IOException {
+ out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+ out.write(op);
+ }
+
+ /** Send OP_READ_BLOCK */
+ public static void opReadBlock(DataOutputStream out,
+ long blockId, long blockGs, long blockOffset, long blockLen,
+ String clientName, AccessToken accessToken) throws IOException {
+ op(out, OP_READ_BLOCK);
+
+ out.writeLong(blockId);
+ out.writeLong(blockGs);
+ out.writeLong(blockOffset);
+ out.writeLong(blockLen);
+ Text.writeString(out, clientName);
+ accessToken.write(out);
+ out.flush();
+ }
+
+ /** Send OP_WRITE_BLOCK */
+ public static void opWriteBlock(DataOutputStream out,
+ long blockId, long blockGs, int pipelineSize, boolean isRecovery,
+ String client, DatanodeInfo src, DatanodeInfo[] targets,
+ AccessToken accesstoken) throws IOException {
+ op(out, OP_WRITE_BLOCK);
+
+ out.writeLong(blockId);
+ out.writeLong(blockGs);
+ out.writeInt(pipelineSize);
+ out.writeBoolean(isRecovery);
+ Text.writeString(out, client);
+
+ out.writeBoolean(src != null);
+ if (src != null) {
+ src.write(out);
+ }
+ out.writeInt(targets.length - 1);
+ for (int i = 1; i < targets.length; i++) {
+ targets[i].write(out);
+ }
+
+ accesstoken.write(out);
+ }
+
+ /** Send OP_REPLACE_BLOCK */
+ public static void opReplaceBlock(DataOutputStream out,
+ long blockId, long blockGs, String storageId, DatanodeInfo src,
+ AccessToken accesstoken) throws IOException {
+ op(out, OP_REPLACE_BLOCK);
+
+ out.writeLong(blockId);
+ out.writeLong(blockGs);
+ Text.writeString(out, storageId);
+ src.write(out);
+ accesstoken.write(out);
+ out.flush();
+ }
+
+ /** Send OP_COPY_BLOCK */
+ public static void opCopyBlock(DataOutputStream out,
+ long blockId, long blockGs, AccessToken accesstoken) throws IOException {
+ op(out, OP_COPY_BLOCK);
+
+ out.writeLong(blockId);
+ out.writeLong(blockGs);
+ accesstoken.write(out);
+ out.flush();
+ }
+
+ /** Send OP_BLOCK_CHECKSUM */
+ public static void opBlockChecksum(DataOutputStream out,
+ long blockId, long blockGs, AccessToken accesstoken) throws IOException {
+ op(out, OP_BLOCK_CHECKSUM);
+
+ out.writeLong(blockId);
+ out.writeLong(blockGs);
+ accesstoken.write(out);
+ out.flush();
+ }
+ }
+
+ /** Receiver */
+ public static abstract class Receiver {
+ /** Initialize a operation. */
+ public final byte op(DataInputStream in) throws IOException {
+ final short version = in.readShort();
+ if (version != DATA_TRANSFER_VERSION) {
+ throw new IOException( "Version Mismatch" );
+ }
+ return in.readByte();
+ }
+
+ /** Receive OP_READ_BLOCK */
+ public final void opReadBlock(DataInputStream in) throws IOException {
+ final long blockId = in.readLong();
+ final long blockGs = in.readLong();
+ final long offset = in.readLong();
+ final long length = in.readLong();
+ final String client = Text.readString(in);
+ final AccessToken accesstoken = readAccessToken(in);
+
+ opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
+ }
+
+ /** Abstract OP_READ_BLOCK method. */
+ public abstract void opReadBlock(DataInputStream in,
+ long blockId, long blockGs, long offset, long length,
+ String client, AccessToken accesstoken) throws IOException;
+
+ /** Receive OP_WRITE_BLOCK */
+ public final void opWriteBlock(DataInputStream in) throws IOException {
+ final long blockId = in.readLong();
+ final long blockGs = in.readLong();
+ final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
+ final boolean isRecovery = in.readBoolean(); // is this part of recovery?
+ final String client = Text.readString(in); // working on behalf of this client
+ final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
+
+ final int nTargets = in.readInt();
+ if (nTargets < 0) {
+ throw new IOException("Mislabelled incoming datastream.");
+ }
+ final DatanodeInfo targets[] = new DatanodeInfo[nTargets];
+ for (int i = 0; i < targets.length; i++) {
+ targets[i] = DatanodeInfo.read(in);
+ }
+ final AccessToken accesstoken = readAccessToken(in);
+
+ opWriteBlock(in, blockId, blockGs, pipelineSize, isRecovery,
+ client, src, targets, accesstoken);
+ }
+
+ /** Abstract OP_WRITE_BLOCK method. */
+ public abstract void opWriteBlock(DataInputStream in,
+ long blockId, long blockGs, int pipelineSize, boolean isRecovery,
+ String client, DatanodeInfo src, DatanodeInfo[] targets,
+ AccessToken accesstoken) throws IOException;
+
+ /** Receive OP_REPLACE_BLOCK */
+ public final void opReplaceBlock(DataInputStream in) throws IOException {
+ final long blockId = in.readLong();
+ final long blockGs = in.readLong();
+ final String sourceId = Text.readString(in); // read del hint
+ final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
+ final AccessToken accesstoken = readAccessToken(in);
+
+ opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
+ }
+
+ /** Abstract OP_REPLACE_BLOCK method. */
+ public abstract void opReplaceBlock(DataInputStream in,
+ long blockId, long blockGs, String sourceId, DatanodeInfo src,
+ AccessToken accesstoken) throws IOException;
+
+ /** Receive OP_COPY_BLOCK */
+ public final void opCopyBlock(DataInputStream in) throws IOException {
+ final long blockId = in.readLong();
+ final long blockGs = in.readLong();
+ final AccessToken accesstoken = readAccessToken(in);
+
+ opCopyBlock(in, blockId, blockGs, accesstoken);
+ }
+
+ /** Abstract OP_COPY_BLOCK method. */
+ public abstract void opCopyBlock(DataInputStream in,
+ long blockId, long blockGs, AccessToken accesstoken) throws IOException;
+
+ /** Receive OP_BLOCK_CHECKSUM */
+ public final void opBlockChecksum(DataInputStream in) throws IOException {
+ final long blockId = in.readLong();
+ final long blockGs = in.readLong();
+ final AccessToken accesstoken = readAccessToken(in);
+
+ opBlockChecksum(in, blockId, blockGs, accesstoken);
+ }
+
+ /** Abstract OP_BLOCK_CHECKSUM method. */
+ public abstract void opBlockChecksum(DataInputStream in,
+ long blockId, long blockGs, AccessToken accesstoken) throws IOException;
+
+ /** Read an AccessToken */
+ static private AccessToken readAccessToken(DataInputStream in
+ ) throws IOException {
+ final AccessToken t = new AccessToken();
+ t.readFields(in);
+ return t;
+ }
+ }
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=787537&r1=787536&r2=787537&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Jun 23
04:25:27 2009
@@ -343,6 +343,13 @@
setAdminState(WritableUtils.readEnum(in, AdminStates.class));
}
+ /** Read a DatanodeInfo */
+ public static DatanodeInfo read(DataInput in) throws IOException {
+ final DatanodeInfo d = new DatanodeInfo();
+ d.readFields(in);
+ return d;
+ }
+
@Override
public int hashCode() {
// Super implementation is sufficient
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=787537&r1=787536&r2=787537&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Jun
23 04:25:27 2009
@@ -19,9 +19,7 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
-import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -67,8 +65,6 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
@@ -367,20 +363,15 @@
/* Send a block replace request to the output stream*/
private void sendRequest(DataOutputStream out) throws IOException {
- out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK);
- out.writeLong(block.getBlock().getBlockId());
- out.writeLong(block.getBlock().getGenerationStamp());
- Text.writeString(out, source.getStorageID());
- proxySource.getDatanode().write(out);
AccessToken accessToken = AccessToken.DUMMY_TOKEN;
if (isAccessTokenEnabled) {
accessToken = accessTokenHandler.generateToken(null, block.getBlock()
.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
AccessTokenHandler.AccessMode.COPY));
}
- accessToken.write(out);
- out.flush();
+ DataTransferProtocol.Sender.opReplaceBlock(out,
+ block.getBlock().getBlockId(), block.getBlock().getGenerationStamp(),
+ source.getStorageID(), proxySource.getDatanode(), accessToken);
}
/* Receive a block copy response from the input stream */
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=787537&r1=787536&r2=787537&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Jun
23 04:25:27 2009
@@ -56,11 +56,11 @@
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
@@ -72,13 +72,12 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
-import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
@@ -94,8 +93,8 @@
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -1216,26 +1215,15 @@
//
// Header info
//
- out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- out.writeByte(DataTransferProtocol.OP_WRITE_BLOCK);
- out.writeLong(b.getBlockId());
- out.writeLong(b.getGenerationStamp());
- out.writeInt(0); // no pipelining
- out.writeBoolean(false); // not part of recovery
- Text.writeString(out, ""); // client
- out.writeBoolean(true); // sending src node information
- srcNode.write(out); // Write src node DatanodeInfo
- // write targets
- out.writeInt(targets.length - 1);
- for (int i = 1; i < targets.length; i++) {
- targets[i].write(out);
- }
AccessToken accessToken = AccessToken.DUMMY_TOKEN;
if (isAccessTokenEnabled) {
accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
}
- accessToken.write(out);
+ DataTransferProtocol.Sender.opWriteBlock(out,
+ b.getBlockId(), b.getGenerationStamp(), 0, false, "",
+ srcNode, targets, accessToken);
+
// send data & checksum
blockSender.sendBlock(out, baseStream, null);
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=787537&r1=787536&r2=787537&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue
Jun 23 04:25:27 2009
@@ -47,7 +47,8 @@
/**
* Thread for processing incoming/outgoing data stream.
*/
-class DataXceiver implements Runnable, FSConstants {
+class DataXceiver extends DataTransferProtocol.Receiver
+ implements Runnable, FSConstants {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
@@ -78,12 +79,8 @@
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
- short version = in.readShort();
- if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
- throw new IOException( "Version Mismatch" );
- }
+ final byte op = op(in);
boolean local = s.getInetAddress().equals(s.getLocalAddress());
- byte op = in.readByte();
// Make sure the xciver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
@@ -94,7 +91,7 @@
long startTime = DataNode.now();
switch ( op ) {
case DataTransferProtocol.OP_READ_BLOCK:
- readBlock( in );
+ opReadBlock(in);
datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
if (local)
datanode.myMetrics.readsFromLocalClient.inc();
@@ -102,7 +99,7 @@
datanode.myMetrics.readsFromRemoteClient.inc();
break;
case DataTransferProtocol.OP_WRITE_BLOCK:
- writeBlock( in );
+ opWriteBlock(in);
datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
if (local)
datanode.myMetrics.writesFromLocalClient.inc();
@@ -110,16 +107,16 @@
datanode.myMetrics.writesFromRemoteClient.inc();
break;
case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
- replaceBlock(in);
+ opReplaceBlock(in);
datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_COPY_BLOCK:
// for balancing purpose; send to a proxy source
- copyBlock(in);
+ opCopyBlock(in);
datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
- getBlockChecksum(in);
+ opBlockChecksum(in);
datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
break;
default:
@@ -138,21 +135,12 @@
/**
* Read a block from the disk.
- * @param in The stream to read from
- * @throws IOException
*/
- private void readBlock(DataInputStream in) throws IOException {
- //
- // Read in the header
- //
- long blockId = in.readLong();
- Block block = new Block( blockId, 0 , in.readLong());
-
- long startOffset = in.readLong();
- long length = in.readLong();
- String clientName = Text.readString(in);
- AccessToken accessToken = new AccessToken();
- accessToken.readFields(in);
+ @Override
+ public void opReadBlock(DataInputStream in,
+ long blockId, long blockGs, long startOffset, long length,
+ String clientName, AccessToken accessToken) throws IOException {
+ final Block block = new Block(blockId, 0 , blockGs);
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
@@ -224,42 +212,24 @@
/**
* Write a block to disk.
- *
- * @param in The stream to read from
- * @throws IOException
*/
- private void writeBlock(DataInputStream in) throws IOException {
- DatanodeInfo srcDataNode = null;
- LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
- " tcp no delay " + s.getTcpNoDelay());
- //
- // Read in the header
- //
- Block block = new Block(in.readLong(),
- dataXceiverServer.estimateBlockSize, in.readLong());
+ @Override
+ public void opWriteBlock(DataInputStream in, long blockId, long blockGs,
+ int pipelineSize, boolean isRecovery,
+ String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
+ AccessToken accessToken) throws IOException {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
+ " tcp no delay " + s.getTcpNoDelay());
+ }
+
+ final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
+ blockGs);
LOG.info("Receiving block " + block +
" src: " + remoteAddress +
" dest: " + localAddress);
- int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
- boolean isRecovery = in.readBoolean(); // is this part of recovery?
- String client = Text.readString(in); // working on behalf of this client
- boolean hasSrcDataNode = in.readBoolean(); // is src node info present
- if (hasSrcDataNode) {
- srcDataNode = new DatanodeInfo();
- srcDataNode.readFields(in);
- }
- int numTargets = in.readInt();
- if (numTargets < 0) {
- throw new IOException("Mislabelled incoming datastream.");
- }
- DatanodeInfo targets[] = new DatanodeInfo[numTargets];
- for (int i = 0; i < targets.length; i++) {
- DatanodeInfo tmp = new DatanodeInfo();
- tmp.readFields(in);
- targets[i] = tmp;
- }
- AccessToken accessToken = new AccessToken();
- accessToken.readFields(in);
+
DataOutputStream replyOut = null; // stream to prev target
replyOut = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
@@ -302,9 +272,9 @@
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
try {
- int timeoutValue = numTargets * datanode.socketTimeout;
+ int timeoutValue = targets.length * datanode.socketTimeout;
int writeTimeout = datanode.socketWriteTimeout +
- (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
+ (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
@@ -315,22 +285,9 @@
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
// Write header: Copied from DFSClient.java!
- mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
- mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );
- mirrorOut.writeLong( block.getBlockId() );
- mirrorOut.writeLong( block.getGenerationStamp() );
- mirrorOut.writeInt( pipelineSize );
- mirrorOut.writeBoolean( isRecovery );
- Text.writeString( mirrorOut, client );
- mirrorOut.writeBoolean(hasSrcDataNode);
- if (hasSrcDataNode) { // pass src node information
- srcDataNode.write(mirrorOut);
- }
- mirrorOut.writeInt( targets.length - 1 );
- for ( int i = 1; i < targets.length; i++ ) {
- targets[i].write( mirrorOut );
- }
- accessToken.write(mirrorOut);
+ DataTransferProtocol.Sender.opWriteBlock(mirrorOut,
+ block.getBlockId(), block.getGenerationStamp(), pipelineSize,
+ isRecovery, client, srcDataNode, targets, accessToken);
blockReceiver.writeChecksumHeader(mirrorOut);
mirrorOut.flush();
@@ -414,12 +371,11 @@
/**
* Get block checksum (MD5 of CRC32).
- * @param in
*/
- void getBlockChecksum(DataInputStream in) throws IOException {
- final Block block = new Block(in.readLong(), 0 , in.readLong());
- AccessToken accessToken = new AccessToken();
- accessToken.readFields(in);
+ @Override
+ public void opBlockChecksum(DataInputStream in,
+ long blockId, long blockGs, AccessToken accessToken) throws IOException {
+ final Block block = new Block(blockId, 0 , blockGs);
DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
datanode.socketWriteTimeout));
if (datanode.isAccessTokenEnabled
@@ -471,16 +427,12 @@
/**
* Read a block from the disk and then sends it to a destination.
- *
- * @param in The stream to read from
- * @throws IOException
*/
- private void copyBlock(DataInputStream in) throws IOException {
+ @Override
+ public void opCopyBlock(DataInputStream in,
+ long blockId, long blockGs, AccessToken accessToken) throws IOException {
// Read in the header
- long blockId = in.readLong(); // read block id
- Block block = new Block(blockId, 0, in.readLong());
- AccessToken accessToken = new AccessToken();
- accessToken.readFields(in);
+ Block block = new Block(blockId, 0, blockGs);
if (datanode.isAccessTokenEnabled
&& !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
AccessTokenHandler.AccessMode.COPY)) {
@@ -545,20 +497,14 @@
/**
* Receive a block and write it to disk, it then notifies the namenode to
* remove the copy from the source.
- *
- * @param in The stream to read from
- * @throws IOException
*/
- private void replaceBlock(DataInputStream in) throws IOException {
+ @Override
+ public void opReplaceBlock(DataInputStream in,
+ long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
+ AccessToken accessToken) throws IOException {
/* read header */
- long blockId = in.readLong();
- Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
- in.readLong()); // block id & generation stamp
- String sourceID = Text.readString(in); // read del hint
- DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
- proxySource.readFields(in);
- AccessToken accessToken = new AccessToken();
- accessToken.readFields(in);
+ final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
+ blockGs);
if (datanode.isAccessTokenEnabled
&& !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
AccessTokenHandler.AccessMode.REPLACE)) {
@@ -597,12 +543,8 @@
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
/* send request to the proxy */
- proxyOut.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); // transfer version
- proxyOut.writeByte(DataTransferProtocol.OP_COPY_BLOCK); // op code
- proxyOut.writeLong(block.getBlockId()); // block id
- proxyOut.writeLong(block.getGenerationStamp()); // block id
- accessToken.write(proxyOut);
- proxyOut.flush();
+ DataTransferProtocol.Sender.opCopyBlock(proxyOut, block.getBlockId(),
+ block.getGenerationStamp(), accessToken);
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
|