hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r708637 - in /hadoop/core/branches/branch-0.18: ./ src/hdfs/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Date Tue, 28 Oct 2008 19:13:40 GMT
Author: hairong
Date: Tue Oct 28 12:13:39 2008
New Revision: 708637

URL: http://svn.apache.org/viewvc?rev=708637&view=rev
Log:
Revert the patch to HADOOP-4116 on the branch 0.18 section because it caused incompatibility
between 0.18.1 and 0.18.2

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
    hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSConstants.java
    hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=708637&r1=708636&r2=708637&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Tue Oct 28 12:13:39 2008
@@ -4,8 +4,6 @@
 
   BUG FIXES
 
-    HADOOP-4116. Balancer should provide better resource management. (hairong)
-
     HADOOP-3614. Fix a bug that Datanode may use an old GenerationStamp to get
     meta file. (szetszwo)
 

Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java?rev=708637&r1=708636&r2=708637&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/Balancer.java Tue Oct
28 12:13:39 2008
@@ -28,6 +28,7 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -173,11 +174,6 @@
     LogFactory.getLog("org.apache.hadoop.dfs.Balancer");
   final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
 
-  /** The maximum number of concurrent blocks moves for 
-   * balancing purpose at a datanode
-   */
-  public static final int MAX_NUM_CONCURRENT_MOVES = 5;
-  
   private Configuration conf;
 
   private double threshold = 10D;
@@ -212,10 +208,10 @@
   
   private double avgUtilization = 0.0D;
   
-  final static private int MOVER_THREAD_POOL_SIZE = 1000;
+  final private int MOVER_THREAD_POOL_SIZE = 1000;
   final private ExecutorService moverExecutor = 
     Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
-  final static private int DISPATCHER_THREAD_POOL_SIZE = 200;
+  final private int DISPATCHER_THREAD_POOL_SIZE = 200;
   final private ExecutorService dispatcherExecutor =
     Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
   
@@ -260,13 +256,11 @@
             this.block = block;
             if ( chooseProxySource() ) {
               addToMoved(block);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Decided to move block "+ block.getBlockId()
-                    +" with a length of "+FsShell.byteDesc(block.getNumBytes())
-                    + " bytes from " + source.getName() 
-                    + " to " + target.getName()
-                    + " using proxy source " + proxySource.getName() );
-              }
+              LOG.info("Decided to move block "+ block.getBlockId()
+                  +" with a length of "+FsShell.byteDesc(block.getNumBytes())
+                  + " bytes from " + source.getName() 
+                  + " to " + target.getName()
+                  + " using proxy source " + proxySource.getName() );
               return true;
             }
           }
@@ -307,8 +301,10 @@
       DataInputStream in = null;
       try {
         sock.connect(DataNode.createSocketAddr(
-            target.datanode.getName()), FSConstants.READ_TIMEOUT);
-        sock.setKeepAlive(true);
+            proxySource.datanode.getName()), FSConstants.READ_TIMEOUT);
+        long bandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
+        sock.setSoTimeout(2*FSConstants.READ_TIMEOUT+
+            (int)(block.getNumBytes()*1500/bandwidth));
         out = new DataOutputStream( new BufferedOutputStream(
             sock.getOutputStream(), FSConstants.BUFFER_SIZE));
         sendRequest(out);
@@ -316,17 +312,25 @@
             sock.getInputStream(), FSConstants.BUFFER_SIZE));
         receiveResponse(in);
         bytesMoved.inc(block.getNumBytes());
-        LOG.info( "Moving block " + block.getBlock().getBlockId() +
+        if (LOG.isDebugEnabled()) {
+          LOG.debug( "Moving block " + block.getBlock().getBlockId() +
               " from "+ source.getName() + " to " +
               target.getName() + " through " +
               proxySource.getName() +
-              " is succeeded." );
+              " succeeded." );
+        }
+      } catch (SocketTimeoutException te) { 
+        LOG.warn("Timeout moving block "+block.getBlockId()+
+            " from " + source.getName() + " to " +
+            target.getName() + " through " +
+            proxySource.getName());
       } catch (IOException e) {
         LOG.warn("Error moving block "+block.getBlockId()+
             " from " + source.getName() + " to " +
             target.getName() + " through " +
             proxySource.getName() +
-            ": "+e.getMessage());
+            ": "+e.getMessage()+ "\n" +
+            StringUtils.stringifyException(e) );
       } finally {
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
@@ -349,11 +353,11 @@
     /* Send a block copy request to the outputstream*/
     private void sendRequest(DataOutputStream out) throws IOException {
       out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
-      out.writeByte(FSConstants.OP_REPLACE_BLOCK);
+      out.writeByte(FSConstants.OP_COPY_BLOCK);
       out.writeLong(block.getBlock().getBlockId());
       out.writeLong(block.getBlock().getGenerationStamp());
       Text.writeString(out, source.getStorageID());
-      proxySource.write(out);
+      target.write(out);
       out.flush();
     }
     
@@ -361,7 +365,11 @@
     private void receiveResponse(DataInputStream in) throws IOException {
       short status = in.readShort();
       if (status != FSConstants.OP_STATUS_SUCCESS) {
-        throw new IOException("block move is failed.");
+        throw new IOException("Moving block "+block.getBlockId()+
+            " from "+source.getName() + " to " +
+            target.getName() + " through " +
+            proxySource.getName() +
+        "failed");
       }
     }
 
@@ -377,10 +385,8 @@
     private void scheduleBlockMove() {
       moverExecutor.execute(new Runnable() {
         public void run() {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Starting moving "+ block.getBlockId() +
-                " from " + proxySource.getName() + " to " + target.getName());
-          }
+          LOG.info("Starting moving "+ block.getBlockId() +
+              " from " + proxySource.getName() + " to " + target.getName());
           dispatch();
         }
       });
@@ -470,6 +476,8 @@
   /* A class that keeps track of a datanode in Balancer */
   private static class BalancerDatanode implements Writable {
     final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
+    final protected static short MAX_NUM_CONCURRENT_MOVES =
+      DataNode.MAX_BALANCING_THREADS;
     protected DatanodeInfo datanode;
     private double utilization;
     protected long maxSizeToMove;
@@ -906,9 +914,6 @@
     // compute average utilization
     long totalCapacity=0L, totalUsedSpace=0L;
     for (DatanodeInfo datanode : datanodes) {
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
-        continue; // ignore decommissioning or decommissioned nodes
-      }
       totalCapacity += datanode.getCapacity();
       totalUsedSpace += datanode.getDfsUsed();
     }
@@ -922,9 +927,6 @@
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
     shuffleArray(datanodes);
     for (DatanodeInfo datanode : datanodes) {
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
-        continue; // ignore decommissioning or decommissioned nodes
-      }
       cluster.add(datanode);
       BalancerDatanode datanodeS;
       if (getUtilization(datanode) > avgUtilization) {

Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java?rev=708637&r1=708636&r2=708637&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DataNode.java Tue Oct
28 12:13:39 2008
@@ -32,7 +32,6 @@
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.dfs.IncorrectVersionException;
 import org.apache.hadoop.mapred.StatusHttpServer;
-import org.apache.hadoop.dfs.Balancer;
 import org.apache.hadoop.dfs.BlockCommand;
 import org.apache.hadoop.dfs.DatanodeProtocol;
 import org.apache.hadoop.dfs.FSDatasetInterface.MetaDataInputStream;
@@ -46,6 +45,7 @@
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.*;
+import java.util.concurrent.Semaphore;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 
@@ -146,45 +146,6 @@
   private static final int MAX_XCEIVER_COUNT = 256;
   private int maxXceiverCount = MAX_XCEIVER_COUNT;
   
-  /** A manager to make sure that cluster balancing does not
-   * take too much resources.
-   * 
-   * It limits the number of block moves for balancing and
-   * the total amount of bandwidth they can use.
-   */
-  private static class BlockBalanceThrottler extends Throttler {
-   private int numThreads;
-   
-   /**Constructor
-    * 
-    * @param bandwidth Total amount of bandwidth can be used for balancing 
-    */
-   private BlockBalanceThrottler(long bandwidth) {
-     super(bandwidth);
-     LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
-   }
-   
-   /** Check if the block move can start. 
-    * 
-    * Return true if the thread quota is not exceeded and 
-    * the counter is incremented; False otherwise.
-    */
-   private synchronized boolean acquire() {
-     if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
-       return false;
-     }
-     numThreads++;
-     return true;
-   }
-   
-   /** Mark that the move is completed. The thread counter is decremented. */
-   private synchronized void release() {
-     numThreads--;
-   }
-  }
-
-  private BlockBalanceThrottler balanceThrottler;
-  
   /**
    * We need an estimate for block size to check if the disk partition has
    * enough space. For now we set it to be the default block size set
@@ -195,6 +156,12 @@
    */
   private long estimateBlockSize;
   
+  // The following three fields are to support balancing
+  final static short MAX_BALANCING_THREADS = 5;
+  private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
+  long balanceBandwidth;
+  private Throttler balancingThrottler;
+
   // For InterDataNodeProtocol
   Server ipcServer;
   
@@ -341,8 +308,9 @@
     DataNode.nameNodeAddr = nameNodeAddr;
 
     //set up parameter for cluster balancing
-    this.balanceThrottler = new BlockBalanceThrottler(
-      conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
+    this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
+    LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
+    this.balancingThrottler = new Throttler(balanceBandwidth);
 
     //initialize periodic block scanner
     String reason = null;
@@ -916,6 +884,24 @@
     }
   }
 
+  /* utility function for receiving a response */
+  private static void receiveResponse(Socket s, int numTargets) throws IOException {
+    // check the response
+    DataInputStream reply = new DataInputStream(new BufferedInputStream(
+                                NetUtils.getInputStream(s), BUFFER_SIZE));
+    try {
+      for (int i = 0; i < numTargets; i++) {
+        short opStatus = reply.readShort();
+        if(opStatus != OP_STATUS_SUCCESS) {
+          throw new IOException("operation failed at "+
+              s.getInetAddress());
+        } 
+      }
+    } finally {
+      IOUtils.closeStream(reply);
+    }
+  }
+
   /* utility function for sending a respose */
   private static void sendResponse(Socket s, short opStatus, long timeout) 
                                                        throws IOException {
@@ -959,7 +945,6 @@
       this.ss = ss;
     }
 
-
     /**
      */
     public void run() {
@@ -1375,50 +1360,67 @@
       // Read in the header
       long blockId = in.readLong(); // read block id
       Block block = new Block(blockId, 0, in.readLong());
+      String source = Text.readString(in); // read del hint
+      DatanodeInfo target = new DatanodeInfo(); // read target
+      target.readFields(in);
 
-      if (!balanceThrottler.acquire()) { // not able to start
-        LOG.info("Not able to copy block " + blockId + " to "
-            + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
-        return;
-      }
-
+      Socket targetSock = null;
+      short opStatus = OP_STATUS_SUCCESS;
       BlockSender blockSender = null;
-      DataOutputStream reply = null;
-      boolean isOpSuccess = true;
-
+      DataOutputStream targetOut = null;
       try {
+        balancingSem.acquireUninterruptibly();
+        
         // check if the block exists or not
         blockSender = new BlockSender(block, 0, -1, false, false, false);
 
-        // set up response stream
-        OutputStream baseStream = NetUtils.getOutputStream(
-            s, socketWriteTimeout);
-        reply = new DataOutputStream(new BufferedOutputStream(
-            baseStream, SMALL_BUFFER_SIZE));
-
+        // get the output stream to the target
+        InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
+        targetSock = newSocket();
+        targetSock.connect(targetAddr, socketTimeout);
+        targetSock.setSoTimeout(socketTimeout);
+
+        OutputStream baseStream = NetUtils.getOutputStream(targetSock, 
+                                                            socketWriteTimeout);
+        targetOut = new DataOutputStream(
+                       new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
 
-        // send block content to the target 
-        long read = blockSender.sendBlock(reply, baseStream, 
-                                          balanceThrottler);
+        /* send request to the target */
+        // fist write header info
+        targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
+        targetOut.writeByte(OP_REPLACE_BLOCK); // op code
+        targetOut.writeLong(block.getBlockId()); // block id
+        targetOut.writeLong(block.getGenerationStamp()); // block id
+        Text.writeString( targetOut, source); // del hint
+
+        // then send data
+        long read = blockSender.sendBlock(targetOut, baseStream, 
+                                          balancingThrottler);
 
         myMetrics.bytesRead.inc((int) read);
         myMetrics.blocksRead.inc();
         
-        LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
+        // check the response from target
+        receiveResponse(targetSock, 1);
+
+        LOG.info("Copied block " + block + " to " + targetAddr);
       } catch (IOException ioe) {
-        isOpSuccess = false;
+        opStatus = OP_STATUS_ERROR;
+        LOG.warn("Got exception while serving " + block + " to "
+            + target.getName() + ": " + StringUtils.stringifyException(ioe));
         throw ioe;
       } finally {
-        balanceThrottler.release();
-        if (isOpSuccess) {
-          try {
-            // send one last byte to indicate that the resource is cleaned.
-            reply.writeChar('d');
-          } catch (IOException ignored) {
-          }
+        /* send response to the requester */
+        try {
+          sendResponse(s, opStatus, socketWriteTimeout);
+        } catch (IOException replyE) {
+          LOG.warn("Error writing the response back to "+
+              s.getRemoteSocketAddress() + "\n" +
+              StringUtils.stringifyException(replyE) );
         }
-        IOUtils.closeStream(reply);
+        IOUtils.closeStream(targetOut);
         IOUtils.closeStream(blockSender);
+        balancingSem.release();
       }
     }
 
@@ -1431,59 +1433,21 @@
      * @throws IOException
      */
     private void replaceBlock(DataInputStream in) throws IOException {
-        /* read header */
-        long blockId = in.readLong();
-        Block block = new Block(blockId, estimateBlockSize,
-            in.readLong()); // block id & generation stamp
-        String sourceID = Text.readString(in); // read del hint
-        DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
-        proxySource.readFields(in);
-  
-        if (!balanceThrottler.acquire()) { // not able to start
-          LOG.warn("Not able to receive block " + blockId + " from " 
-              + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
-          sendResponse(s, (short)OP_STATUS_ERROR, 
-              socketWriteTimeout);
-          return;
-        }
-  
-        Socket proxySock = null;
-        DataOutputStream proxyOut = null;
-  
-        short opStatus = OP_STATUS_SUCCESS;
-        BlockReceiver blockReceiver = null;
-        DataInputStream proxyReply = null;
-  
-        try {
-        // get the output stream to the proxy
-        InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
-            proxySource.getName());
-        proxySock = newSocket();
-        proxySock.connect(proxyAddr, socketTimeout);
-        proxySock.setSoTimeout(socketTimeout);
-  
-        OutputStream baseStream = NetUtils.getOutputStream(proxySock, 
-            socketWriteTimeout);
-        proxyOut = new DataOutputStream(
-                       new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-  
-        /* send request to the proxy */
-        proxyOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
-        proxyOut.writeByte(OP_COPY_BLOCK); // op code
-        proxyOut.writeLong(block.getBlockId()); // block id
-        proxyOut.writeLong(block.getGenerationStamp()); // block id
-        proxyOut.flush();
-  
-        // receive the response from the proxy
-        proxyReply = new DataInputStream(new BufferedInputStream(
-            NetUtils.getInputStream(proxySock), BUFFER_SIZE));
+      balancingSem.acquireUninterruptibly();
+
+      /* read header */
+      Block block = new Block(in.readLong(), estimateBlockSize, in.readLong()); // block
id & len
+      String sourceID = Text.readString(in);
+
+      short opStatus = OP_STATUS_SUCCESS;
+      BlockReceiver blockReceiver = null;
+      try {
         // open a block receiver and check if the block does not exist
-        blockReceiver = new BlockReceiver(
-            block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
-            false, "", null);
+         blockReceiver = new BlockReceiver(
+            block, in, s.getRemoteSocketAddress().toString(), false, "", null);
 
         // receive a block
-        blockReceiver.receiveBlock(null, null, null, null, balanceThrottler, -1);
+        blockReceiver.receiveBlock(null, null, null, null, balancingThrottler, -1);
                       
         // notify name node
         notifyNamenodeReceivedBlock(block, sourceID);
@@ -1494,26 +1458,14 @@
         opStatus = OP_STATUS_ERROR;
         throw ioe;
       } finally {
-        // receive the last byte that indicates the proxy released its thread resource
-        if (opStatus == OP_STATUS_SUCCESS) {
-          try {
-            proxyReply.readChar();
-          } catch (IOException ignored) {
-          }
-        }
-      
-        // now release the thread resource
-        balanceThrottler.release();
-      
         // send response back
         try {
           sendResponse(s, opStatus, socketWriteTimeout);
         } catch (IOException ioe) {
           LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
         }
-        IOUtils.closeStream(proxyOut);
         IOUtils.closeStream(blockReceiver);
-        IOUtils.closeStream(proxyReply);
+        balancingSem.release();
       }
     }
   }

Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSConstants.java?rev=708637&r1=708636&r2=708637&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSConstants.java Tue Oct
28 12:13:39 2008
@@ -100,15 +100,12 @@
    * This should change when serialization of DatanodeInfo, not just
    * when protocol changes. It is not very obvious. 
    */
-  /* Version 14:
-   *    OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
-   *    including the block id, source, and proxy.
-   *    OP_COPY_BLOCK is sent from the destination to the proxy, which contains
-   *    only the block id.
-   *    A reply to OP_COPY_BLOCK sends the block content.
-   *    A reply to OP_REPLACE_BLOCK includes an operation status.
+  /*
+   * Version 11:
+   *    OP_WRITE_BLOCK sends a boolean. If its value is true, an additonal 
+   *    DatanodeInfo of client requesting transfer is also sent. 
    */
-  public static final int DATA_TRANSFER_VERSION = 14;
+  public static final int DATA_TRANSFER_VERSION = 11;
 
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;

Modified: hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java?rev=708637&r1=708636&r2=708637&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
(original)
+++ hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
Tue Oct 28 12:13:39 2008
@@ -203,7 +203,7 @@
   }
 
   /* Copy a block from sourceProxy to destination. If the block becomes
-   * over-replicated, preferably remove it from source.
+   * overreplicated, preferrably remove it from source.
    * 
    * Return true if a block is successfully copied; otherwise false.
    */
@@ -211,16 +211,16 @@
       DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
     Socket sock = new Socket();
     sock.connect(NetUtils.createSocketAddr(
-        destination.getName()), FSConstants.READ_TIMEOUT);
-    sock.setKeepAlive(true);
+        sourceProxy.getName()), FSConstants.READ_TIMEOUT);
+    sock.setSoTimeout(FSConstants.READ_TIMEOUT);
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
     out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
-    out.writeByte(FSConstants.OP_REPLACE_BLOCK);
+    out.writeByte(FSConstants.OP_COPY_BLOCK);
     out.writeLong(block.getBlockId());
     out.writeLong(block.getGenerationStamp());
     Text.writeString(out, source.getStorageID());
-    sourceProxy.write(out);
+    destination.write(out);
     out.flush();
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());



Mime
View raw message