hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r798233 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop...
Date Mon, 27 Jul 2009 17:46:14 GMT
Author: szetszwo
Date: Mon Jul 27 17:46:13 2009
New Revision: 798233

URL: http://svn.apache.org/viewvc?rev=798233&view=rev
Log:
HDFS-501. Use enum to define the constants in DataTransferProtocol.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=798233&r1=798232&r2=798233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Mon Jul 27 17:46:13 2009
@@ -99,6 +99,9 @@
     HDFS-484. Fix bin-package and package target to package jar files.
     (gkesavan)
 
+    HDFS-501. Use enum to define the constants in DataTransferProtocol.
+    (szetszwo)
+
 Release 0.20.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=798233&r1=798232&r2=798233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Mon Jul 27 17:46:13 2009
@@ -17,42 +17,100 @@
  */
 package org.apache.hadoop.hdfs;
 
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.fs.*;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.CRC32;
+
+import javax.net.SocketFactory;
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
-import org.apache.hadoop.security.InvalidAccessTokenException;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.InvalidAccessTokenException;
 import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.util.*;
-
-import org.apache.commons.logging.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.zip.CRC32;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentHashMap;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-
-import javax.net.SocketFactory;
-import javax.security.auth.login.LoginException;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -619,15 +677,14 @@
         try {
           if (LOG.isDebugEnabled()) {
             LOG.debug("write to " + datanodes[j].getName() + ": "
-                + DataTransferProtocol.OP_BLOCK_CHECKSUM +
-                ", block=" + block);
+                + BLOCK_CHECKSUM + ", block=" + block);
           }
           DataTransferProtocol.Sender.opBlockChecksum(out, block.getBlockId(),
               block.getGenerationStamp(), lb.getAccessToken());
 
-          final short reply = in.readShort();
-          if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
-            if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN
+          final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
+          if (reply != SUCCESS) {
+            if (reply == ERROR_ACCESS_TOKEN
                 && i > lastRetriedIndex) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
@@ -1353,9 +1410,9 @@
           new BufferedInputStream(NetUtils.getInputStream(sock), 
                                   bufferSize));
       
-      short status = in.readShort();
-      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
-        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+      DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
+      if (status != SUCCESS) {
+        if (status == ERROR_ACCESS_TOKEN) {
           throw new InvalidAccessTokenException(
               "Got access token error in response to OP_READ_BLOCK "
                   + "for file " + file + " for block " + blockId);
@@ -1402,9 +1459,7 @@
     private void checksumOk(Socket sock) {
       try {
         OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-        byte buf[] = { (DataTransferProtocol.OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
-                       (DataTransferProtocol.OP_STATUS_CHECKSUM_OK) & 0xff };
-        out.write(buf);
+        CHECKSUM_OK.writeOutputStream(out);
         out.flush();
       } catch (IOException e) {
         // its ok not to be able to send this.
@@ -2476,8 +2531,9 @@
 
               // processes response status from all datanodes.
               for (int i = 0; i < targets.length && clientRunning; i++) {
-                short reply = blockReplyStream.readShort();
-                if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
+                final DataTransferProtocol.Status reply
+                    = DataTransferProtocol.Status.read(blockReplyStream);
+                if (reply != SUCCESS) {
                   errorIndex = i; // first bad datanode
                   throw new IOException("Bad response " + reply +
                       " for block " + block +
@@ -2716,7 +2772,7 @@
       //
       private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
           boolean recoveryFlag) {
-        short pipelineStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
+        DataTransferProtocol.Status pipelineStatus = SUCCESS;
         String firstBadLink = "";
         if (LOG.isDebugEnabled()) {
           for (int i = 0; i < nodes.length; i++) {
@@ -2755,10 +2811,10 @@
           out.flush();
 
           // receive ack for connect
-          pipelineStatus = blockReplyStream.readShort();
+          pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
           firstBadLink = Text.readString(blockReplyStream);
-          if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
-            if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+          if (pipelineStatus != SUCCESS) {
+            if (pipelineStatus == ERROR_ACCESS_TOKEN) {
               throw new InvalidAccessTokenException(
                   "Got access token error for connect ack with firstBadLink as "
                       + firstBadLink);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=798233&r1=798232&r2=798233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Mon Jul 27 17:46:13 2009
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessToken;
@@ -42,39 +45,136 @@
    */
   public static final int DATA_TRANSFER_VERSION = 16;
 
-  // Processed at datanode stream-handler
-  public static final byte OP_WRITE_BLOCK = (byte) 80;
-  public static final byte OP_READ_BLOCK = (byte) 81;
-  /**
-   * @deprecated As of version 15, OP_READ_METADATA is no longer supported
-   */
-  @Deprecated public static final byte OP_READ_METADATA = (byte) 82;
-  public static final byte OP_REPLACE_BLOCK = (byte) 83;
-  public static final byte OP_COPY_BLOCK = (byte) 84;
-  public static final byte OP_BLOCK_CHECKSUM = (byte) 85;
+  /** Operation */
+  public enum Op {
+    WRITE_BLOCK((byte)80),
+    READ_BLOCK((byte)81),
+    READ_METADATA((byte)82),
+    REPLACE_BLOCK((byte)83),
+    COPY_BLOCK((byte)84),
+    BLOCK_CHECKSUM((byte)85);
+
+    /** The code for this operation. */
+    public final byte code;
+    
+    private Op(byte code) {
+      this.code = code;
+    }
+    
+    private static final int FIRST_CODE = values()[0].code;
+    /** Return the object represented by the code. */
+    private static Op valueOf(byte code) {
+      final int i = (code & 0xff) - FIRST_CODE;
+      return i < 0 || i >= values().length? null: values()[i];
+    }
+
+    /** Read from in */
+    public static Op read(DataInput in) throws IOException {
+      return valueOf(in.readByte());
+    }
+
+    /** Write to out */
+    public void write(DataOutput out) throws IOException {
+      out.write(code);
+    }
+  };
+
+  /** Status */
+  public enum Status {
+    SUCCESS(0),
+    ERROR(1),
+    ERROR_CHECKSUM(2),
+    ERROR_INVALID(3),
+    ERROR_EXISTS(4),
+    ERROR_ACCESS_TOKEN(5),
+    CHECKSUM_OK(6);
+
+    /** The code for this operation. */
+    private final int code;
+    
+    private Status(int code) {
+      this.code = code;
+    }
+
+    private static final int FIRST_CODE = values()[0].code;
+    /** Return the object represented by the code. */
+    private static Status valueOf(int code) {
+      final int i = code - FIRST_CODE;
+      return i < 0 || i >= values().length? null: values()[i];
+    }
+
+    /** Read from in */
+    public static Status read(DataInput in) throws IOException {
+      return valueOf(in.readShort());
+    }
+
+    /** Write to out */
+    public void write(DataOutput out) throws IOException {
+      out.writeShort(code);
+    }
+
+    /** Write to out */
+    public void writeOutputStream(OutputStream out) throws IOException {
+      out.write(new byte[] {(byte)(code >>> 8), (byte)code});
+    }
+  };
   
-  public static final int OP_STATUS_SUCCESS = 0;  
-  public static final int OP_STATUS_ERROR = 1;  
-  public static final int OP_STATUS_ERROR_CHECKSUM = 2;  
-  public static final int OP_STATUS_ERROR_INVALID = 3;  
-  public static final int OP_STATUS_ERROR_EXISTS = 4;  
-  public static final int OP_STATUS_ERROR_ACCESS_TOKEN = 5;
-  public static final int OP_STATUS_CHECKSUM_OK = 6;
+  /** @deprecated Deprecated at 0.21.  Use Op.WRITE_BLOCK instead. */
+  @Deprecated
+  public static final byte OP_WRITE_BLOCK = Op.WRITE_BLOCK.code;
+  /** @deprecated Deprecated at 0.21.  Use Op.READ_BLOCK instead. */
+  @Deprecated
+  public static final byte OP_READ_BLOCK = Op.READ_BLOCK.code;
+  /** @deprecated As of version 15, OP_READ_METADATA is no longer supported. */
+  @Deprecated
+  public static final byte OP_READ_METADATA = Op.READ_METADATA.code;
+  /** @deprecated Deprecated at 0.21.  Use Op.REPLACE_BLOCK instead. */
+  @Deprecated
+  public static final byte OP_REPLACE_BLOCK = Op.REPLACE_BLOCK.code;
+  /** @deprecated Deprecated at 0.21.  Use Op.COPY_BLOCK instead. */
+  @Deprecated
+  public static final byte OP_COPY_BLOCK = Op.COPY_BLOCK.code;
+  /** @deprecated Deprecated at 0.21.  Use Op.BLOCK_CHECKSUM instead. */
+  @Deprecated
+  public static final byte OP_BLOCK_CHECKSUM = Op.BLOCK_CHECKSUM.code;
+
+
+  /** @deprecated Deprecated at 0.21.  Use Status.SUCCESS instead. */
+  @Deprecated
+  public static final int OP_STATUS_SUCCESS = Status.SUCCESS.code;  
+  /** @deprecated Deprecated at 0.21.  Use Status.ERROR instead. */
+  @Deprecated
+  public static final int OP_STATUS_ERROR = Status.ERROR.code;
+  /** @deprecated Deprecated at 0.21.  Use Status.ERROR_CHECKSUM instead. */
+  @Deprecated
+  public static final int OP_STATUS_ERROR_CHECKSUM = Status.ERROR_CHECKSUM.code;
+  /** @deprecated Deprecated at 0.21.  Use Status.ERROR_INVALID instead. */
+  @Deprecated
+  public static final int OP_STATUS_ERROR_INVALID = Status.ERROR_INVALID.code;
+  /** @deprecated Deprecated at 0.21.  Use Status.ERROR_EXISTS instead. */
+  @Deprecated
+  public static final int OP_STATUS_ERROR_EXISTS = Status.ERROR_EXISTS.code;
+  /** @deprecated Deprecated at 0.21.  Use Status.ERROR_ACCESS_TOKEN instead.*/
+  @Deprecated
+  public static final int OP_STATUS_ERROR_ACCESS_TOKEN = Status.ERROR_ACCESS_TOKEN.code;
+  /** @deprecated Deprecated at 0.21.  Use Status.CHECKSUM_OK instead. */
+  @Deprecated
+  public static final int OP_STATUS_CHECKSUM_OK = Status.CHECKSUM_OK.code;
 
 
   /** Sender */
   public static class Sender {
     /** Initialize a operation. */
-    public static void op(DataOutputStream out, int op) throws IOException {
+    public static void op(DataOutputStream out, Op op) throws IOException {
       out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-      out.write(op);
+      op.write(out);
     }
 
     /** Send OP_READ_BLOCK */
     public static void opReadBlock(DataOutputStream out,
         long blockId, long blockGs, long blockOffset, long blockLen,
         String clientName, AccessToken accessToken) throws IOException {
-      op(out, OP_READ_BLOCK);
+      op(out, Op.READ_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
@@ -90,7 +190,7 @@
         long blockId, long blockGs, int pipelineSize, boolean isRecovery,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
         AccessToken accesstoken) throws IOException {
-      op(out, OP_WRITE_BLOCK);
+      op(out, Op.WRITE_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
@@ -114,7 +214,7 @@
     public static void opReplaceBlock(DataOutputStream out,
         long blockId, long blockGs, String storageId, DatanodeInfo src,
         AccessToken accesstoken) throws IOException {
-      op(out, OP_REPLACE_BLOCK);
+      op(out, Op.REPLACE_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
@@ -127,7 +227,7 @@
     /** Send OP_COPY_BLOCK */
     public static void opCopyBlock(DataOutputStream out,
         long blockId, long blockGs, AccessToken accesstoken) throws IOException {
-      op(out, OP_COPY_BLOCK);
+      op(out, Op.COPY_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
@@ -138,7 +238,7 @@
     /** Send OP_BLOCK_CHECKSUM */
     public static void opBlockChecksum(DataOutputStream out,
         long blockId, long blockGs, AccessToken accesstoken) throws IOException {
-      op(out, OP_BLOCK_CHECKSUM);
+      op(out, Op.BLOCK_CHECKSUM);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
@@ -150,12 +250,12 @@
   /** Receiver */
   public static abstract class Receiver {
     /** Initialize a operation. */
-    public final byte op(DataInputStream in) throws IOException {
+    public final Op op(DataInputStream in) throws IOException {
       final short version = in.readShort();
       if (version != DATA_TRANSFER_VERSION) {
         throw new IOException( "Version Mismatch" );
       }
-      return in.readByte();
+      return Op.read(in);
     }
 
     /** Receive OP_READ_BLOCK */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=798233&r1=798232&r2=798233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Jul 27 17:46:13 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -376,9 +378,9 @@
     
     /* Receive a block copy response from the input stream */ 
     private void receiveResponse(DataInputStream in) throws IOException {
-      short status = in.readShort();
-      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
-        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN)
+      DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
+      if (status != DataTransferProtocol.Status.SUCCESS) {
+        if (status == ERROR_ACCESS_TOKEN)
           throw new IOException("block move failed due to access token error");
         throw new IOException("block move is failed");
       }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=798233&r1=798232&r2=798233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Jul 27 17:46:13 2009
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
+
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -36,11 +40,11 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
-import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 /** A class that receives a block and writes to its own disk, meanwhile
  * may copies it to another site. If a throttler is provided,
@@ -823,7 +827,7 @@
             }
 
             replyOut.writeLong(expected);
-            replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+            SUCCESS.write(replyOut);
             replyOut.flush();
         } catch (Exception e) {
           if (running) {
@@ -854,7 +858,7 @@
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
         try {
-            short op = DataTransferProtocol.OP_STATUS_SUCCESS;
+            DataTransferProtocol.Status op = SUCCESS;
             boolean didRead = false;
             long expected = -2;
             try { 
@@ -919,7 +923,7 @@
             }
             
             if (!didRead) {
-              op = DataTransferProtocol.OP_STATUS_ERROR;
+              op = ERROR;
             }
             
             // If this is the last packet in block, then close block
@@ -948,7 +952,7 @@
 
             // send my status back to upstream datanode
             replyOut.writeLong(expected); // send seqno upstream
-            replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+            SUCCESS.write(replyOut);
 
             LOG.debug("PacketResponder " + numTargets + 
                       " for block " + block +
@@ -958,18 +962,18 @@
             // forward responses from downstream datanodes.
             for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
               try {
-                if (op == DataTransferProtocol.OP_STATUS_SUCCESS) {
-                  op = mirrorIn.readShort();
-                  if (op != DataTransferProtocol.OP_STATUS_SUCCESS) {
+                if (op == SUCCESS) {
+                  op = Status.read(mirrorIn);
+                  if (op != SUCCESS) {
                     LOG.debug("PacketResponder for block " + block +
                               ": error code received from downstream " +
                               " datanode[" + i + "] " + op);
                   }
                 }
               } catch (Throwable e) {
-                op = DataTransferProtocol.OP_STATUS_ERROR;
+                op = ERROR;
               }
-              replyOut.writeShort(op);
+              op.write(replyOut);
             }
             replyOut.flush();
             LOG.debug("PacketResponder " + block + " " + numTargets + 
@@ -982,7 +986,7 @@
             // If we forwarded an error response from a downstream datanode
             // and we are acting on behalf of a client, then we quit. The 
             // client will drive the recovery mechanism.
-            if (op == DataTransferProtocol.OP_STATUS_ERROR && receiver.clientName.length() > 0) {
+            if (op == ERROR && receiver.clientName.length() > 0) {
               running = false;
             }
         } catch (IOException e) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=798233&r1=798232&r2=798233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Jul 27 17:46:13 2009
@@ -17,6 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -42,7 +48,6 @@
 import org.apache.hadoop.security.AccessTokenHandler;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
-import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 /**
  * Thread for processing incoming/outgoing data stream.
@@ -79,7 +84,7 @@
       in = new DataInputStream(
           new BufferedInputStream(NetUtils.getInputStream(s), 
                                   SMALL_BUFFER_SIZE));
-      final byte op = op(in);
+      final DataTransferProtocol.Op op = op(in);
       boolean local = s.getInetAddress().equals(s.getLocalAddress());
       // Make sure the xciver count is not exceeded
       int curXceiverCount = datanode.getXceiverCount();
@@ -90,7 +95,7 @@
       }
       long startTime = DataNode.now();
       switch ( op ) {
-      case DataTransferProtocol.OP_READ_BLOCK:
+      case READ_BLOCK:
         opReadBlock(in);
         datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
         if (local)
@@ -98,7 +103,7 @@
         else
           datanode.myMetrics.readsFromRemoteClient.inc();
         break;
-      case DataTransferProtocol.OP_WRITE_BLOCK:
+      case WRITE_BLOCK:
         opWriteBlock(in);
         datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
         if (local)
@@ -106,16 +111,16 @@
         else
           datanode.myMetrics.writesFromRemoteClient.inc();
         break;
-      case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
+      case REPLACE_BLOCK: // for balancing purpose; send to a destination
         opReplaceBlock(in);
         datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
         break;
-      case DataTransferProtocol.OP_COPY_BLOCK:
+      case COPY_BLOCK:
             // for balancing purpose; send to a proxy source
         opCopyBlock(in);
         datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
         break;
-      case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
+      case BLOCK_CHECKSUM: //get the checksum of a block
         opBlockChecksum(in);
         datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
         break;
@@ -150,7 +155,7 @@
         && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
             AccessTokenHandler.AccessMode.READ)) {
       try {
-        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+        ERROR_ACCESS_TOKEN.write(out);
         out.flush();
         throw new IOException("Access token verification failed, on client "
             + "request for reading block " + block);
@@ -172,19 +177,19 @@
         blockSender = new BlockSender(block, startOffset, length,
             true, true, false, datanode, clientTraceFmt);
       } catch(IOException e) {
-        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
+        ERROR.write(out);
         throw e;
       }
 
-      out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
+      SUCCESS.write(out); // send op status
       long read = blockSender.sendBlock(out, baseStream, null); // send data
 
       if (blockSender.isBlockReadFully()) {
         // See if client verification succeeded. 
         // This is an optional response from client.
         try {
-          if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK  && 
-              datanode.blockScanner != null) {
+          if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK
+              && datanode.blockScanner != null) {
             datanode.blockScanner.verifiedByClient(block);
           }
         } catch (IOException ignored) {}
@@ -238,7 +243,7 @@
             .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
       try {
         if (client.length() != 0) {
-          replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+          ERROR_ACCESS_TOKEN.write(replyOut);
           Text.writeString(replyOut, datanode.dnRegistration.getName());
           replyOut.flush();
         }
@@ -255,7 +260,7 @@
     BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
-    short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
+    DataTransferProtocol.Status mirrorInStatus = SUCCESS;
     try {
       // open a block receiver and check if the block does not exist
       blockReceiver = new BlockReceiver(block, in, 
@@ -296,9 +301,9 @@
 
           // read connect ack (only for clients, not for replication req)
           if (client.length() != 0) {
-            mirrorInStatus = mirrorIn.readShort();
+            mirrorInStatus = DataTransferProtocol.Status.read(mirrorIn);
             firstBadLink = Text.readString(mirrorIn);
-            if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
+            if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
               LOG.info("Datanode " + targets.length +
                        " got response for connect ack " +
                        " from downstream datanode with firstbadlink as " +
@@ -308,7 +313,7 @@
 
         } catch (IOException e) {
           if (client.length() != 0) {
-            replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
+            ERROR.write(replyOut);
             Text.writeString(replyOut, mirrorNode);
             replyOut.flush();
           }
@@ -331,12 +336,12 @@
 
       // send connect ack back to source (only for clients)
       if (client.length() != 0) {
-        if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
+        if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
           LOG.info("Datanode " + targets.length +
                    " forwarding connect ack to upstream firstbadlink is " +
                    firstBadLink);
         }
-        replyOut.writeShort(mirrorInStatus);
+        mirrorInStatus.write(replyOut);
         Text.writeString(replyOut, firstBadLink);
         replyOut.flush();
       }
@@ -387,7 +392,7 @@
         && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
             .getBlockId(), AccessTokenHandler.AccessMode.READ)) {
       try {
-        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+        ERROR_ACCESS_TOKEN.write(out);
         out.flush();
         throw new IOException(
             "Access token verification failed, on getBlockChecksum() "
@@ -418,7 +423,7 @@
       }
 
       //write reply
-      out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+      SUCCESS.write(out);
       out.writeInt(bytesPerCRC);
       out.writeLong(crcPerBlock);
       md5.write(out);
@@ -443,17 +448,14 @@
             AccessTokenHandler.AccessMode.COPY)) {
       LOG.warn("Invalid access token in request from "
           + s.getRemoteSocketAddress() + " for copying block " + block);
-      sendResponse(s,
-          (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
-          datanode.socketWriteTimeout);
+      sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
       return;
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       LOG.info("Not able to copy block " + blockId + " to " 
           + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
-      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR, 
-          datanode.socketWriteTimeout);
+      sendResponse(s, ERROR, datanode.socketWriteTimeout);
       return;
     }
 
@@ -473,7 +475,7 @@
           baseStream, SMALL_BUFFER_SIZE));
 
       // send status first
-      reply.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
+      SUCCESS.write(reply);
       // send block content to the target
       long read = blockSender.sendBlock(reply, baseStream, 
                                         dataXceiverServer.balanceThrottler);
@@ -515,22 +517,20 @@
             AccessTokenHandler.AccessMode.REPLACE)) {
       LOG.warn("Invalid access token in request from "
           + s.getRemoteSocketAddress() + " for replacing block " + block);
-      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
-          datanode.socketWriteTimeout);
+      sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
       return;
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       LOG.warn("Not able to receive block " + blockId + " from " 
           + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
-      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR, 
-          datanode.socketWriteTimeout);
+      sendResponse(s, ERROR, datanode.socketWriteTimeout);
       return;
     }
 
     Socket proxySock = null;
     DataOutputStream proxyOut = null;
-    short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
+    DataTransferProtocol.Status opStatus = SUCCESS;
     BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
     
@@ -554,9 +554,10 @@
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(
           NetUtils.getInputStream(proxySock), BUFFER_SIZE));
-      short status = proxyReply.readShort();
-      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
-        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+      final DataTransferProtocol.Status status
+          = DataTransferProtocol.Status.read(proxyReply);
+      if (status != SUCCESS) {
+        if (status == ERROR_ACCESS_TOKEN) {
           throw new IOException("Copy block " + block + " from "
               + proxySock.getRemoteSocketAddress()
               + " failed due to access token error");
@@ -581,11 +582,11 @@
           " from " + s.getRemoteSocketAddress());
       
     } catch (IOException ioe) {
-      opStatus = DataTransferProtocol.OP_STATUS_ERROR;
+      opStatus = ERROR;
       throw ioe;
     } finally {
       // receive the last byte that indicates the proxy released its thread resource
-      if (opStatus == DataTransferProtocol.OP_STATUS_SUCCESS) {
+      if (opStatus == SUCCESS) {
         try {
           proxyReply.readChar();
         } catch (IOException ignored) {
@@ -613,12 +614,12 @@
    * @param opStatus status message to write
    * @param timeout send timeout
    **/
-  private void sendResponse(Socket s, short opStatus, long timeout) 
-                                                       throws IOException {
+  private void sendResponse(Socket s, DataTransferProtocol.Status opStatus,
+      long timeout) throws IOException {
     DataOutputStream reply = 
       new DataOutputStream(NetUtils.getOutputStream(s, timeout));
     try {
-      reply.writeShort(opStatus);
+      opStatus.write(reply);
       reply.flush();
     } finally {
       IOUtils.closeStream(reply);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=798233&r1=798232&r2=798233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Mon Jul 27 17:46:13 2009
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.READ_BLOCK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -168,13 +173,13 @@
     // bad ops
     sendBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_WRITE_BLOCK - 1);
+    sendOut.writeByte(WRITE_BLOCK.code - 1);
     sendRecvData("Wrong Op Code", true);
     
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_WRITE_BLOCK);
+    WRITE_BLOCK.write(sendOut);
     sendOut.writeLong(newBlockId); // block id
     sendOut.writeLong(0);          // generation stamp
     sendOut.writeInt(0);           // targets in pipeline 
@@ -188,13 +193,13 @@
     // bad bytes per checksum
     sendOut.writeInt(-1-random.nextInt(oneMil));
     recvBuf.reset();
-    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
+    ERROR.write(recvOut);
     sendRecvData("wrong bytesPerChecksum while writing", true);
 
     sendBuf.reset();
     recvBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_WRITE_BLOCK);
+    WRITE_BLOCK.write(sendOut);
     sendOut.writeLong(newBlockId);
     sendOut.writeLong(0);          // generation stamp
     sendOut.writeInt(0);           // targets in pipeline 
@@ -204,13 +209,13 @@
 
     // bad number of targets
     sendOut.writeInt(-1-random.nextInt(oneMil));
-    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
+    ERROR.write(recvOut);
     sendRecvData("bad targets len while writing block " + newBlockId, true);
 
     sendBuf.reset();
     recvBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_WRITE_BLOCK);
+    WRITE_BLOCK.write(sendOut);
     sendOut.writeLong(++newBlockId);
     sendOut.writeLong(0);          // generation stamp
     sendOut.writeInt(0);           // targets in pipeline 
@@ -228,10 +233,10 @@
     
     // bad data chunk length
     sendOut.writeInt(-1-random.nextInt(oneMil));
-    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
+    SUCCESS.write(recvOut);
     Text.writeString(recvOut, ""); // first bad node
     recvOut.writeLong(100);        // sequencenumber
-    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
+    ERROR.write(recvOut);
     sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, 
                  true);
 
@@ -239,7 +244,7 @@
     sendBuf.reset();
     recvBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_WRITE_BLOCK);
+    WRITE_BLOCK.write(sendOut);
     sendOut.writeLong(++newBlockId);
     sendOut.writeLong(0);          // generation stamp
     sendOut.writeInt(0);           // targets in pipeline 
@@ -258,10 +263,10 @@
     sendOut.writeInt(0);           // chunk length
     sendOut.writeInt(0);           // zero checksum
     //ok finally write a block with 0 len
-    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
+    SUCCESS.write(recvOut);
     Text.writeString(recvOut, ""); // first bad node
     recvOut.writeLong(100);        // sequencenumber
-    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
+    SUCCESS.write(recvOut);
     sendRecvData("Writing a zero len block blockid " + newBlockId, false);
     
     /* Test OP_READ_BLOCK */
@@ -270,13 +275,13 @@
     sendBuf.reset();
     recvBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_READ_BLOCK);
+    READ_BLOCK.write(sendOut);
     newBlockId = firstBlock.getBlockId()-1;
     sendOut.writeLong(newBlockId);
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(0L);
     sendOut.writeLong(fileLen);
-    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
+    ERROR.write(recvOut);
     Text.writeString(sendOut, "cl");
     AccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
@@ -284,7 +289,7 @@
     // negative block start offset
     sendBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_READ_BLOCK);
+    READ_BLOCK.write(sendOut);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(-1L);
@@ -297,7 +302,7 @@
     // bad block start offset
     sendBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_READ_BLOCK);
+    READ_BLOCK.write(sendOut);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(fileLen);
@@ -309,10 +314,10 @@
     
     // negative length is ok. Datanode assumes we want to read the whole block.
     recvBuf.reset();
-    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);    
+    SUCCESS.write(recvOut);    
     sendBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_READ_BLOCK);
+    READ_BLOCK.write(sendOut);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(0);
@@ -324,10 +329,10 @@
     
     // length is more than size of block.
     recvBuf.reset();
-    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);    
+    ERROR.write(recvOut);    
     sendBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_READ_BLOCK);
+    READ_BLOCK.write(sendOut);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(0);
@@ -340,7 +345,7 @@
     //At the end of all this, read the file to make sure that succeeds finally.
     sendBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(DataTransferProtocol.OP_READ_BLOCK);
+    READ_BLOCK.write(sendOut);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(0);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java?rev=798233&r1=798232&r2=798233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java Mon Jul 27 17:46:13 2009
@@ -17,26 +17,30 @@
  */
 package org.apache.hadoop.hdfs;
 
-import junit.framework.TestCase;
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.Random;
-import java.net.*;
+
+import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.BlockLocation;
 
 /**
  * This class tests the replication of a DFS file.

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=798233&r1=798232&r2=798233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Mon Jul 27 17:46:13 2009
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.REPLACE_BLOCK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.*;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -44,7 +47,6 @@
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessToken;
@@ -227,7 +229,7 @@
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
     out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-    out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK);
+    REPLACE_BLOCK.write(out);
     out.writeLong(block.getBlockId());
     out.writeLong(block.getGenerationStamp());
     Text.writeString(out, source.getStorageID());
@@ -237,11 +239,7 @@
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());
 
-    short status = reply.readShort();
-    if(status == DataTransferProtocol.OP_STATUS_SUCCESS) {
-      return true;
-    }
-    return false;
+    return DataTransferProtocol.Status.read(reply) == SUCCESS;
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=798233&r1=798232&r2=798233&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Mon Jul 27 17:46:13 2009
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
+
 import java.io.DataOutputStream;
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 
+import junit.framework.TestCase;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,8 +37,6 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessToken;
 
-import junit.framework.TestCase;
-
 /** Test if a datanode can correctly handle errors during block read/write*/
 public class TestDiskError extends TestCase {
   public void testShutdown() throws Exception {
@@ -112,7 +114,7 @@
           s.getOutputStream());
 
       out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
-      out.write( DataTransferProtocol.OP_WRITE_BLOCK );
+      WRITE_BLOCK.write(out);
       out.writeLong( block.getBlock().getBlockId());
       out.writeLong( block.getBlock().getGenerationStamp() );
       out.writeInt(1);



Mime
View raw message