hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r591103 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Date Thu, 01 Nov 2007 18:08:41 GMT
Author: dhruba
Date: Thu Nov  1 11:08:40 2007
New Revision: 591103

URL: http://svn.apache.org/viewvc?rev=591103&view=rev
Log:
HADOOP-1912. Datanode has two new commands COPY and REPLACE. These are
needed for supporting data rebalance.  (Hairong Kuang via dhruba)


Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java   (with props)
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=591103&r1=591102&r2=591103&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Nov  1 11:08:40 2007
@@ -35,6 +35,9 @@
 
     HADOOP-1210.  Log counters in job history. (Owen O'Malley via ddas)
 
+    HADOOP-1912. Datanode has two new commands COPY and REPLACE. These are
+    needed for supporting data rebalance.  (Hairong Kuang via dhruba)
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=591103&r1=591102&r2=591103&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Nov  1 11:08:40 2007
@@ -21,6 +21,7 @@
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -36,8 +37,10 @@
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.concurrent.Semaphore;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
+
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.Updater;
@@ -109,7 +112,9 @@
   DatanodeRegistration dnRegistration = null;
   private String networkLoc;
   volatile boolean shouldRun = true;
-  LinkedList<Block> receivedBlockList = new LinkedList<Block>();
+  private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
+  private LinkedList<String> delHints = new LinkedList<String>();
+  final private static String EMPTY_DEL_HINT = "";
   int xmitsInProgress = 0;
   Daemon dataXceiveServer = null;
   long blockReportInterval;
@@ -124,6 +129,13 @@
   private Thread dataNodeThread = null;
   String machineName;
   int defaultBytesPerChecksum = 512;
+
+  // The following three fields are to support balancing
+  final private static long BALANCE_BANDWIDTH = 1024L*1024; // 1MB/s
+  final private static short MAX_BALANCING_THREADS = 5;
+  private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
+  private Throttler balancingThrottler = new Throttler(BALANCE_BANDWIDTH);
+
   private static class DataNodeMetrics implements Updater {
     private final MetricsRecord metricsRecord;
     private int bytesWritten = 0;
@@ -531,19 +543,28 @@
             
         // check if there are newly received blocks
         Block [] blockArray=null;
+        String [] delHintArray=null;
         synchronized(receivedBlockList) {
-          if (receivedBlockList.size() > 0) {
-            //
-            // Send newly-received blockids to namenode
-            //
-            blockArray = receivedBlockList.toArray(new Block[receivedBlockList.size()]);
+          synchronized(delHints) {
+            int numBlocks = receivedBlockList.size();
+            if (receivedBlockList.size() > 0) {
+              assert(numBlocks==delHints.size());
+              //
+              // Send newly-received blockids to namenode
+              //
+              blockArray = receivedBlockList.toArray(new Block[numBlocks]);
+              delHintArray = delHints.toArray(new String[numBlocks]);
+            }
           }
         }
         if (blockArray != null) {
-          namenode.blockReceived(dnRegistration, blockArray);
+          namenode.blockReceived(dnRegistration, blockArray, delHintArray);
           synchronized (receivedBlockList) {
-            for(Block b: blockArray) {
-              receivedBlockList.remove(b);
+            synchronized (delHints) {
+              for(int i=0; i<blockArray.length; i++) {
+                receivedBlockList.remove(blockArray[i]);
+                delHints.remove(delHintArray[i]);
+              }
             }
           }
         }
@@ -725,15 +746,16 @@
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    */
-  private void notifyNamenodeReceivedBlock(Block block) {
+  private void notifyNamenodeReceivedBlock(Block block, String delHint) {
     synchronized (receivedBlockList) {
-      receivedBlockList.add(block);
-      receivedBlockList.notifyAll();
+      synchronized (delHints) {
+        receivedBlockList.add(block);
+        delHints.add(delHint);
+        receivedBlockList.notifyAll();
+      }
     }
   }
 
-
-
   /**
    * Server used for receiving/sending a block of data.
    * This is created to listen for requests from clients or 
@@ -752,8 +774,6 @@
       try {
         while (shouldRun) {
           Socket s = ss.accept();
-          //s.setSoTimeout(READ_TIMEOUT);
-          xceiverCount.incr();
           new Daemon(new DataXceiver(s)).start();
         }
         ss.close();
@@ -806,13 +826,18 @@
         case OP_READ_METADATA:
           readMetadata( in );
           break;
+        case OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
+          replaceBlock(in);
+          break;
+        case OP_COPY_BLOCK: // for balancing purpose; send to a proxy source
+          copyBlock(in);
+          break;
         default:
           throw new IOException("Unknown opcode " + op + "in data stream");
         }
        } catch (Throwable t) {
         LOG.error("DataXceiver: " + StringUtils.stringifyException(t));
       } finally {
-        xceiverCount.decr();
         LOG.debug("Number of active connections is: "+xceiverCount);
         IOUtils.closeStream(in);
         IOUtils.closeSocket(s);
@@ -825,6 +850,7 @@
      * @throws IOException
      */
     private void readBlock(DataInputStream in) throws IOException {
+      xceiverCount.incr();
       //
       // Read in the header
       //
@@ -864,6 +890,7 @@
                   StringUtils.stringifyException(ioe) );
         throw ioe;
       } finally {
+        xceiverCount.decr();
         IOUtils.closeStream(out);
         IOUtils.closeStream(blockSender);
       }
@@ -876,6 +903,7 @@
      * @throws IOException
      */
     private void writeBlock(DataInputStream in) throws IOException {
+      xceiverCount.incr();
       //
       // Read in the header
       //
@@ -935,10 +963,11 @@
 
         // receive the block and mirror to the next target
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+
         blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null);
 
         // notify name node
-        notifyNamenodeReceivedBlock(block);
+        notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
 
         String msg = "Received block " + block + " from " +
                      s.getInetAddress();
@@ -971,6 +1000,8 @@
         IOUtils.closeStream(mirrorOut);
         IOUtils.closeSocket(mirrorSock);
         IOUtils.closeStream(blockReceiver);
+        // decrement counter
+        xceiverCount.decr();
       }
     }
 
@@ -979,7 +1010,8 @@
      * @param in
      */
     void readMetadata(DataInputStream in) throws IOException {
-      
+      xceiverCount.incr();
+
       Block block = new Block( in.readLong(), 0 );
       InputStream checksumIn = null;
       DataOutputStream out = null;
@@ -1007,21 +1039,190 @@
         //last DATA_CHUNK
         out.writeInt(0);
       } finally {
+        xceiverCount.decr();
         IOUtils.closeStream(checksumIn);
       }
     }
+    
+    /**
+     * Read a block from the disk and then sends it to a destination
+     * 
+     * @param in
+     *          The stream to read from
+     * @throws IOException
+     */
+    private void copyBlock(DataInputStream in) throws IOException {
+      // Read in the header
+      long blockId = in.readLong(); // read block id
+      Block block = new Block(blockId, 0);
+      String source = Text.readString(in); // read del hint
+      DatanodeInfo target = new DatanodeInfo(); // read target
+      target.readFields(in);
+
+      Socket targetSock = null;
+      short opStatus = OP_STATUS_SUCCESS;
+      BlockSender blockSender = null;
+      DataOutputStream targetOut = null;
+      try {
+        balancingSem.acquireUninterruptibly();
+        
+        // check if the block exists or not
+        blockSender = new BlockSender(block, 0, -1, false, false);
+
+        // get the output stream to the target
+        InetSocketAddress targetAddr = createSocketAddr(target.getName());
+        targetSock = new Socket();
+        targetSock.connect(targetAddr, READ_TIMEOUT);
+        targetSock.setSoTimeout(READ_TIMEOUT);
+
+        targetOut = new DataOutputStream(new BufferedOutputStream(
+            targetSock.getOutputStream(), BUFFER_SIZE));
+
+        /* send request to the target */
+        // fist write header info
+        targetOut.writeShort(DATA_TRANFER_VERSION); // transfer version
+        targetOut.writeByte(OP_REPLACE_BLOCK); // op code
+        targetOut.writeLong(block.getBlockId()); // block id
+        Text.writeString( targetOut, source); // del hint
+
+        // then send data
+        long read = blockSender.sendBlock(targetOut, balancingThrottler);
+
+        myMetrics.readBytes((int) read);
+        myMetrics.readBlocks(1);
+        
+        // check the response from target
+        receiveResponse(targetSock);
+
+        LOG.info("Copied block " + block + " to " + targetAddr);
+      } catch (IOException ioe) {
+        opStatus = OP_STATUS_ERROR;
+        LOG.warn("Got exception while serving " + block + " to "
+            + target.getName() + ": " + StringUtils.stringifyException(ioe));
+        throw ioe;
+      } finally {
+        /* send response to the requester */
+        try {
+          sendResponse(s, opStatus);
+        } catch (IOException replyE) {
+          LOG.warn("Error writing the response back to "+
+              s.getRemoteSocketAddress() + "\n" +
+              StringUtils.stringifyException(replyE) );
+        }
+        IOUtils.closeStream(targetOut);
+        IOUtils.closeStream(blockSender);
+        balancingSem.release();
+      }
+    }
+
+    /**
+     * Receive a block and write it to disk, it then notifies the namenode to
+     * remove the copy from the source
+     * 
+     * @param in
+     *          The stream to read from
+     * @throws IOException
+     */
+    private void replaceBlock(DataInputStream in) throws IOException {
+      balancingSem.acquireUninterruptibly();
+
+      /* read header */
+      Block block = new Block(in.readLong(), 0); // block id & len
+      String sourceID = Text.readString(in);
+
+      short opStatus = OP_STATUS_SUCCESS;
+      BlockReceiver blockReceiver = null;
+      try {
+        // open a block receiver and check if the block does not exist
+         blockReceiver = new BlockReceiver(
+            block, in, s.getRemoteSocketAddress().toString());
+
+        // receive a block
+        blockReceiver.receiveBlock(null, null, balancingThrottler);
+        
+        // notify name node
+        notifyNamenodeReceivedBlock(block, sourceID);
+
+        LOG.info("Received block " + block + 
+            " from " + s.getRemoteSocketAddress());
+      } catch (IOException ioe) {
+        opStatus = OP_STATUS_ERROR;
+        throw ioe;
+      } finally {
+        // send response back
+        try {
+          sendResponse(s, opStatus);
+        } catch (IOException ioe) {
+          LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
+        }
+        IOUtils.closeStream(blockReceiver);
+        balancingSem.release();
+      }
+    }
   }
     
-  /* An interface to throttle the block transfers */
-  private interface Throttler {
+  /** a class to throttle the block transfers
+   * This class is thread safe. It can be shared by multiple threads.
+   * The parameter bandwidthPerSec specifies the total bandwidth shared by threads.
+   */
+  static class Throttler {
+    private long period;          // period over which bw is imposed
+    private long periodExtension; // Max period over which bw accumulates.
+    private long bytesPerPeriod; // total number of bytes can be sent in each period
+    private long curPeriodStart; // current period starting time
+    private long curReserve;     // remaining bytes can be sent in the period
+    private long bytesAlreadyUsed;
+
+    /** Constructor */
+    Throttler(long bandwidthPerSec) {
+      this(1000, bandwidthPerSec);  // by default throttling period is 1s
+    }
+
+    /** Constructor */
+    Throttler(long period, long bandwidthPerSec) {
+      this.curPeriodStart = System.currentTimeMillis();
+      this.period = period;
+      this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
+      this.periodExtension = period*3;
+    }
+
     /** Given the numOfBytes sent/received since last time throttle was called,
-     * make the current thread sleep if I/O rate is too fast 
+     * make the current thread sleep if I/O rate is too fast
      * compared to the given bandwidth
-     * 
-     * @param numOfBytes 
+     *
+     * @param numOfBytes
      *     number of bytes sent/received since last time throttle was called
-     */ 
-    void throttle(int numOfBytes);
+     */
+    public synchronized void throttle(long numOfBytes) {
+      if ( numOfBytes <= 0 ) {
+        return;
+      }
+
+      curReserve -= numOfBytes;
+      bytesAlreadyUsed += numOfBytes;
+
+      while (curReserve <= 0) {
+        long now = System.currentTimeMillis();
+        long curPeriodEnd = curPeriodStart + period;
+
+        if ( now < curPeriodEnd ) {
+          // Wait for next period so that curReserve can be increased.
+          try {
+            wait( curPeriodEnd - now );
+          } catch (InterruptedException ignored) {}
+        } else if ( now <  (curPeriodStart + periodExtension)) {
+          curPeriodStart = curPeriodEnd;
+          curReserve += bytesPerPeriod;
+        } else {
+          // discard the prev period. Throttler might not have
+          // been used for a long time.
+          curPeriodStart = now;
+          curReserve = bytesPerPeriod - bytesAlreadyUsed;
+        }
+      }
+
+      bytesAlreadyUsed -= numOfBytes;
+    }
   }
 
   private class BlockSender implements java.io.Closeable {
@@ -1175,7 +1376,7 @@
       out.write(buf, 0, len + checksumSize);
 
       if (throttler != null) { // rebalancing so throttle
-        throttler.throttle(len + checksumSize);
+        throttler.throttle(len + checksumSize + 4);
       }
 
       return len;
@@ -1354,7 +1555,7 @@
       }
 
       if (throttler != null) { // throttle I/O
-        throttler.throttle(len + checksumSize);
+        throttler.throttle(len + checksumSize + 4);
       }
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=591103&r1=591102&r2=591103&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Thu Nov  1 11:08:40
2007
@@ -31,9 +31,9 @@
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /*
-   * 9: heartbeat sends also the data node used space;
+   * 10: blockReceived also sends hints for deletion
    */
-  public static final long versionID = 9L;
+  public static final long versionID = 10L;
   
   // error code
   final static int NOTIFY = 0;
@@ -89,12 +89,15 @@
     
   /**
    * blockReceived() allows the DataNode to tell the NameNode about
-   * recently-received block data.  For example, whenever client code
+   * recently-received block data, with a hint for pereferred replica
+   * to be deleted when there is any excessive blocks.
+   * For example, whenever client code
    * writes a new Block here, or another DataNode copies a Block to
    * this DataNode, it will call blockReceived().
    */
   public void blockReceived(DatanodeRegistration registration,
-                            Block blocks[]) throws IOException;
+                            Block blocks[],
+                            String[] delHints) throws IOException;
 
   /**
    * errorReport() tells the NameNode about something that has gone

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=591103&r1=591102&r2=591103&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Thu Nov  1 11:08:40
2007
@@ -85,6 +85,8 @@
   public static final byte OP_WRITE_BLOCK = (byte) 80;
   public static final byte OP_READ_BLOCK = (byte) 81;
   public static final byte OP_READ_METADATA = (byte) 82;
+  public static final byte OP_REPLACE_BLOCK = (byte) 83;
+  public static final byte OP_COPY_BLOCK = (byte) 84;
   
   public static final int OP_STATUS_SUCCESS = 0;  
   public static final int OP_STATUS_ERROR = 1;  
@@ -97,13 +99,21 @@
    * This should change when serialization of DatanodeInfo, not just
    * when protocol changes. It is not very obvious. 
    */
-  /* Version 6: 
-   * 0 marks the end of a block not an EMPTY_CHUNK
-   * OP_READ_BLOCK: return OP_STATUS_ERROR if received an invalid block id
-   *                return OP_STATUS_ERROR if received an invalid length
-   * OP_WRITE_BLOCK: return OP_STATUS_ERROR if illegal bytesPerChecksum
+  /* Version 7: 
+   * Add two operations to data node
+   * OP_COPY_BLOCK: 
+   *   The command is for sending to a proxy source for the balancing purpose
+   *   The datanode then sends OP_REPLACE_BLOCK request to the destination
+   *   OP_COPY_BLOCK BlockID(long) SourceID (UTF8) Destination (DatanodeInfo)
+   *   return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise
+   * OP_REPLACE_BLOCK: 
+   *   the command is for sending to a destination for the balancing purpose
+   *   The datanode then writes the block to disk and notifies namenode of this
+   *   received block together with a deletion hint: sourceID
+   *   OP_REPLACE_BLOCK BlockID(long) SourceID(UTF8) Block_Data_With_Crc
+   *   return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise
    */
-  public static final int DATA_TRANFER_VERSION = 6; //Should it be 1?
+  public static final int DATA_TRANFER_VERSION = 7;
 
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=591103&r1=591102&r2=591103&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Nov  1 11:08:40
2007
@@ -606,24 +606,22 @@
    * return the length of the added block; 0 if the block is not added
    */
   private long addBlock(Block block, List<BlockWithLocations> results) {
-    int numNodes = blocksMap.numNodes(block);
-    String[] machineSet = new String[numNodes];
-    if (numNodes > 0) {
-      numNodes = 0;
-      for(Iterator<DatanodeDescriptor> it = 
-          blocksMap.nodeIterator(block); it.hasNext();) {
-        String storageID = it.next().getStorageID();
-        // filter invalidate replicas
-        Collection<Block> blocks = recentInvalidateSets.get(storageID); 
-        if(blocks==null || !blocks.contains(block)) {
-          machineSet[numNodes++] = storageID;
-        }
+    ArrayList<String> machineSet =
+      new ArrayList<String>(blocksMap.numNodes(block));
+    for(Iterator<DatanodeDescriptor> it = 
+      blocksMap.nodeIterator(block); it.hasNext();) {
+      String storageID = it.next().getStorageID();
+      // filter invalidate replicas
+      Collection<Block> blocks = recentInvalidateSets.get(storageID); 
+      if(blocks==null || !blocks.contains(block)) {
+        machineSet.add(storageID);
       }
     }
-    if(numNodes == 0) {
+    if(machineSet.size() == 0) {
       return 0;
     } else {
-      results.add(new BlockWithLocations(block, machineSet));
+      results.add(new BlockWithLocations(block, 
+          machineSet.toArray(new String[machineSet.size()])));
       return block.getNumBytes();
     }
   }
@@ -770,7 +768,7 @@
       LOG.info("Reducing replication for file " + src 
                + ". New replication is " + replication);
       for(int idx = 0; idx < fileBlocks.length; idx++)
-        proccessOverReplicatedBlock(fileBlocks[idx], replication);
+        proccessOverReplicatedBlock(fileBlocks[idx], replication, null, null);
     }
     return true;
   }
@@ -2219,7 +2217,7 @@
       removeStoredBlock(b, node);
     }
     for (Block b : toAdd) {
-      addStoredBlock(b, node);
+      addStoredBlock(b, node, null);
     }
         
     //
@@ -2262,7 +2260,9 @@
    * needed replications if this takes care of the problem.
    * @return the block that is stored in blockMap.
    */
-  synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
+  synchronized Block addStoredBlock(Block block, 
+                                    DatanodeDescriptor node,
+                                    DatanodeDescriptor delNodeHint) {
         
     INodeFile fileINode = blocksMap.getINode(block);
     int replication = (fileINode != null) ?  fileINode.getReplication() : 
@@ -2333,7 +2333,7 @@
       updateNeededReplications(block, curReplicaDelta, 0);
     }
     if (numCurrentReplica > fileReplication) {
-      proccessOverReplicatedBlock(block, fileReplication);
+      proccessOverReplicatedBlock(block, fileReplication, node, delNodeHint);
     }
     return block;
   }
@@ -2343,7 +2343,11 @@
    * If there are any extras, call chooseExcessReplicates() to
    * mark them in the excessReplicateMap.
    */
-  private void proccessOverReplicatedBlock(Block block, short replication) {
+  private void proccessOverReplicatedBlock(Block block, short replication, 
+      DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
+    if(addedNode == delNodeHint) {
+      delNodeHint = null;
+    }
     Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
     for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block); 
          it.hasNext();) {
@@ -2355,7 +2359,8 @@
         }
       }
     }
-    chooseExcessReplicates(nonExcess, block, replication);    
+    chooseExcessReplicates(nonExcess, block, replication, 
+        addedNode, delNodeHint);    
   }
 
   /**
@@ -2373,7 +2378,9 @@
    * then pick a node with least free space
    */
   void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
-                              Block b, short replication) {
+                              Block b, short replication,
+                              DatanodeDescriptor addedNode,
+                              DatanodeDescriptor delNodeHint) {
     // first form a rack to datanodes map and
     HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
       new HashMap<String, ArrayList<DatanodeDescriptor>>();
@@ -2405,22 +2412,29 @@
       }
     }
     
-    // pick one node with least space from priSet if it is not empty
+    // pick one node to delete that favors the delete hint
+    // otherwise pick one with least space from priSet if it is not empty
     // otherwise one node with least space from remains
     while (nonExcess.size() - replication > 0) {
       DatanodeInfo cur = null;
       long minSpace = Long.MAX_VALUE;
 
-      Iterator<DatanodeDescriptor> iter = 
-        priSet.isEmpty() ? remains.iterator() : priSet.iterator();
-      while( iter.hasNext() ) {
-        DatanodeDescriptor node = iter.next();
-        long free = node.getRemaining();
-                
-        if (minSpace > free) {
-          minSpace = free;
-          cur = node;
-        }
+      // check if we can del delNodeHint
+      if( delNodeHint !=null && (priSet.contains(delNodeHint) ||
+          (addedNode != null && !priSet.contains(addedNode))) ) {
+          cur = delNodeHint;
+      } else { // regular excessive replica removal
+        Iterator<DatanodeDescriptor> iter = 
+          priSet.isEmpty() ? remains.iterator() : priSet.iterator();
+          while( iter.hasNext() ) {
+            DatanodeDescriptor node = iter.next();
+            long free = node.getRemaining();
+
+            if (minSpace > free) {
+              minSpace = free;
+              cur = node;
+            }
+          }
       }
 
       // adjust rackmap, priSet, and remains
@@ -2430,14 +2444,13 @@
       if(datanodes.isEmpty()) {
         rackMap.remove(rack);
       }
-      if (priSet.isEmpty()) {
-        remains.remove(cur);
-      } else {
-        priSet.remove(cur);
+      if( priSet.remove(cur) ) {
         if (datanodes.size() == 1) {
           priSet.remove(datanodes.get(0));
           remains.add(datanodes.get(0));
         }
+      } else {
+        remains.remove(cur);
       }
 
       nonExcess.remove(cur);
@@ -2515,7 +2528,8 @@
    * The given node is reporting that it received a certain block.
    */
   public synchronized void blockReceived(DatanodeID nodeID,  
-                                         Block block
+                                         Block block,
+                                         String delHint
                                          ) throws IOException {
     DatanodeDescriptor node = getDatanode(nodeID);
     if (node == null) {
@@ -2538,10 +2552,22 @@
       throw new DisallowedDatanodeException(node);
     }
 
+    // get the deletion hint node
+    DatanodeDescriptor delHintNode = null;
+    if(delHint!=null && delHint.length()!=0) {
+      delHintNode = datanodeMap.get(delHint);
+      if(delHintNode == null) {
+        NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
+            + block.getBlockName()
+            + " is expected to be removed from an unrecorded node " 
+            + delHint);
+      }
+    }
+
     //
     // Modify the blocks->datanode map and node's map.
     // 
-    addStoredBlock(block, node);
+    addStoredBlock(block, node, delHintNode );
     pendingReplications.remove(block);
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=591103&r1=591102&r2=591103&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Thu Nov  1 11:08:40 2007
@@ -596,12 +596,13 @@
   }
 
   public void blockReceived(DatanodeRegistration nodeReg, 
-                            Block blocks[]) throws IOException {
+                            Block blocks[],
+                            String delHints[]) throws IOException {
     verifyRequest(nodeReg);
     stateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
                          +"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
     for (int i = 0; i < blocks.length; i++) {
-      namesystem.blockReceived(nodeReg, blocks[i]);
+      namesystem.blockReceived(nodeReg, blocks[i], delHints[i]);
     }
   }
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java?rev=591103&r1=591102&r2=591103&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java Thu Nov  1 11:08:40
2007
@@ -109,19 +109,29 @@
     Path root = new Path(topdir);
     
     for (int idx = 0; idx < nFiles; idx++) {
-      Path fPath = new Path(root, files[idx].getName());
-      if (!fs.mkdirs(fPath.getParent())) {
-        throw new IOException("Mkdirs failed to create " + 
-                              fPath.getParent().toString());
-      }
-      FSDataOutputStream out = fs.create(fPath, replicationFactor);
-      byte[] toWrite = new byte[files[idx].getSize()];
-      Random rb = new Random(files[idx].getSeed());
-      rb.nextBytes(toWrite);
-      out.write(toWrite);
-      out.close();
-      toWrite = null;
+      createFile(fs, new Path(root, files[idx].getName()), files[idx].getSize(),
+          replicationFactor, files[idx].getSeed());
+    }
+  }
+  
+  static void createFile(FileSystem fs, Path fileName, long fileLen, 
+      short replFactor, long seed) throws IOException {
+    if (!fs.mkdirs(fileName.getParent())) {
+      throw new IOException("Mkdirs failed to create " + 
+                            fileName.getParent().toString());
+    }
+    FSDataOutputStream out = fs.create(fileName, replFactor);
+    byte[] toWrite = new byte[1024];
+    Random rb = new Random(seed);
+    long bytesToWrite = fileLen;
+    while (bytesToWrite>0) {
+     rb.nextBytes(toWrite);
+     int bytesToWriteNext = (1024<bytesToWrite)?1024:(int)bytesToWrite;
+
+     out.write(toWrite, 0, bytesToWriteNext);
+     bytesToWrite -= bytesToWriteNext;
     }
+    out.close();
   }
   
   /** check if the files have been copied correctly. */
@@ -168,33 +178,31 @@
     Path root = new Path(topdir);
 
     /** wait for the replication factor to settle down */
-    while (true) {
-      boolean good = true;
-      for (int idx = 0; idx < nFiles; idx++) {
-        Path fPath = new Path(root, files[idx].getName());
-        String locs[][] = fs.getFileCacheHints(fPath, 0, Long.MAX_VALUE);
-        for (int j = 0; j < locs.length; j++) {
-          String[] loc = locs[j];
-          if (loc.length != value) {
-            System.out.println("File " + fPath + " has replication factor " +
-                               loc.length);
-            good = false;
-            break;
-          }
-        }
-        if (!good) {
+    for (int idx = 0; idx < nFiles; idx++) {
+      waitReplication(fs, new Path(root, files[idx].getName()), value);
+    }
+  }
+  
+  /** wait for the file's replication to be done */
+  static void waitReplication(FileSystem fs, Path fileName, 
+      short replFactor)  throws IOException {
+    boolean good;
+    do {
+      good = true;
+      String locs[][] = fs.getFileCacheHints(fileName, 0, Long.MAX_VALUE);
+      for (int j = 0; j < locs.length; j++) {
+        String[] loc = locs[j];
+        if (loc.length != replFactor) {
+          System.out.println("File " + fileName + " has replication factor " +
+              loc.length);
+          try {
+            System.out.println("Waiting for replication factor to drain");
+            Thread.sleep(100);
+          } catch (InterruptedException e) {} 
           break;
         }
       }
-      if (!good) {
-        try {
-          System.out.println("Waiting for replication factor to drain");
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {} 
-        continue;
-      }
-      break;
-    }
+    } while(!good);
   }
   
   /** delete directory and everything underneath it.*/

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java?rev=591103&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java Thu Nov 
1 11:08:40 2007
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+import junit.framework.TestCase;
+/**
+ * This class tests if block replacement request to data nodes work correctly.
+ */
+public class TestBlockReplacement extends TestCase {
+  MiniDFSCluster cluster;
+  public void testThrottler() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set("fs.default.name", "localhost:0");
+    long bandwidthPerSec = 1024*1024L;
+    final long TOTAL_BYTES =6*bandwidthPerSec; 
+    long bytesToSend = TOTAL_BYTES; 
+    long start = FSNamesystem.now();
+    DataNode.Throttler throttler = new DataNode.Throttler(bandwidthPerSec);
+    long totalBytes = 0L;
+    long bytesSent = 1024*512L; // 0.5MB
+    throttler.throttle(bytesSent);
+    bytesToSend -= bytesSent;
+    bytesSent = 1024*768L; // 0.75MB
+    throttler.throttle(bytesSent);
+    bytesToSend -= bytesSent;
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ignored) {}
+    throttler.throttle(bytesToSend);
+    long end = FSNamesystem.now();
+    assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
+  }
+  
+  public void testBlockReplacement() throws IOException {
+    final Configuration CONF = new Configuration();
+    final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"};
+    final String[] NEW_RACKS = {"/RACK2"};
+
+    final short REPLICATION_FACTOR = (short)3;
+    final int DEFAULT_BLOCK_SIZE = 1024;
+    final Random r = new Random();
+    
+    CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE/2);
+    CONF.setLong("dfs.blockreport.intervalMsec",500);
+    cluster = new MiniDFSCluster(
+          CONF, REPLICATION_FACTOR, true, INITIAL_RACKS );
+    try {
+      cluster.waitActive();
+      
+      FileSystem fs = cluster.getFileSystem();
+      Path fileName = new Path("/tmp.txt");
+      
+      // create a file with one block
+      DFSTestUtil.createFile(fs, fileName,
+          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, r.nextLong());
+      DFSTestUtil.waitReplication(fs,fileName, REPLICATION_FACTOR);
+      
+      List<LocatedBlock> locatedBlocks = cluster.getNameNode().
+        getBlockLocations("/tmp.txt", 0, DEFAULT_BLOCK_SIZE).getLocatedBlocks();
+      assertEquals(1, locatedBlocks.size());
+      LocatedBlock block = locatedBlocks.get(0);
+      DatanodeInfo[]  oldNodes = block.getLocations();
+      assertEquals(oldNodes.length, 3);
+      Block b = block.getBlock();
+      
+      // add a new datanode to the cluster
+      cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS);
+      cluster.waitActive();
+      
+      // get all datanodes
+      InetSocketAddress addr = new InetSocketAddress("localhost",
+          cluster.getNameNodePort());
+      DFSClient client = new DFSClient(addr, CONF);
+      DatanodeInfo[] datanodes = client.datanodeReport(DatanodeReportType.ALL);
+
+      // find out the new node
+      DatanodeInfo newNode=null;
+      for(DatanodeInfo node:datanodes) {
+        Boolean isNewNode = true;
+        for(DatanodeInfo oldNode:oldNodes) {
+          if(node.equals(oldNode)) {
+            isNewNode = false;
+            break;
+          }
+        }
+        if(isNewNode) {
+          newNode = node;
+          break;
+        }
+      }
+      
+      assertTrue(newNode!=null);
+      DatanodeInfo source=null;
+      ArrayList<DatanodeInfo> proxies = new ArrayList<DatanodeInfo>(2);
+      for(DatanodeInfo node:datanodes) {
+        if(node != newNode) {
+          if( node.getNetworkLocation().equals(newNode.getNetworkLocation())) {
+            source = node;
+          } else {
+            proxies.add( node );
+          }
+        }
+      }
+      assertTrue(source!=null && proxies.size()==2);
+      
+      // start to replace the block
+      // case 1: proxySource does not contain the block
+      assertFalse(replaceBlock(b, source, newNode, proxies.get(0)));
+      // case 2: destination contains the block
+      assertFalse(replaceBlock(b, source, proxies.get(0), proxies.get(1)));
+      // case 3: correct case
+      assertTrue(replaceBlock(b, source, proxies.get(0), newNode));
+      // block locations should contain two proxies and newNode
+      checkBlocks(source, fileName.toString(), 
+          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR);
+      // case 4: proxies.get(1) is not a valid del hint
+      assertTrue(replaceBlock(b, proxies.get(1), proxies.get(0), source));
+      /* block locations should contain two proxies and source;
+       * newNode was deleted.
+       */
+      checkBlocks(newNode, fileName.toString(), 
+          DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /* file's blocks do not exist at excludedNode */
+  private void checkBlocks(DatanodeInfo excludedNode, String fileName, 
+      long fileLen, short replFactor) throws IOException {
+    Boolean notDone;
+    do {
+      List<LocatedBlock> blocks = cluster.getNameNode().
+      getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
+      assertEquals(1, blocks.size());
+      DatanodeInfo[] nodes = blocks.get(0).getLocations();
+      notDone = (nodes.length != replFactor);
+      if(!notDone) {
+      for(DatanodeInfo node:nodes) {
+        if(node.equals(excludedNode) ) {
+          notDone=true; 
+          try {
+            Thread.sleep(100);
+          } catch(InterruptedException e) {
+          }
+          break;
+        }
+      }
+      }
+    } while(notDone);
+  }
+
+  /* Copy a block from sourceProxy to destination. If the block becomes
+   * overreplicated, preferrably remove it from source.
+   * 
+   * Return true if a block is successfully copied; otherwise false.
+   */
+  private boolean replaceBlock( Block block, DatanodeInfo source,
+      DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
+    Socket sock = new Socket();
+    sock.connect(DataNode.createSocketAddr(
+        sourceProxy.getName()), FSConstants.READ_TIMEOUT);
+    sock.setSoTimeout(FSConstants.READ_TIMEOUT);
+    // sendRequest
+    DataOutputStream out = new DataOutputStream(sock.getOutputStream());
+    out.writeShort(FSConstants.DATA_TRANFER_VERSION);
+    out.writeByte(FSConstants.OP_COPY_BLOCK);
+    out.writeLong(block.getBlockId());
+    Text.writeString(out, source.getStorageID());
+    destination.write(out);
+    out.flush();
+    // receiveResponse
+    DataInputStream reply = new DataInputStream(sock.getInputStream());
+
+    short status = reply.readShort();
+    if(status == FSConstants.OP_STATUS_SUCCESS) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    (new TestBlockReplacement()).testBlockReplacement();
+  }
+
+}

Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL



Mime
View raw message