hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r811746 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/j...
Date Sun, 06 Sep 2009 02:02:35 GMT
Author: shv
Date: Sun Sep  6 02:02:32 2009
New Revision: 811746

URL: http://svn.apache.org/viewvc?rev=811746&view=rev
Log:
HDFS-576. Block report includes under-construction replicas. Contributed by Konstantin Shvachko.

Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Sun Sep  6 02:02:32 2009
@@ -4,6 +4,8 @@
 
   INCOMPATIBLE CHANGES
 
+    HDFS-576. Block report includes under-construction replicas. (shv)
+
   NEW FEATURES
 
     HDFS-536. Support hflush at DFSClient. (hairong)

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java Sun Sep  6 02:02:32 2009
@@ -18,6 +18,10 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 
 /**
  * This class provides an interface for accessing list of blocks that
@@ -25,41 +29,82 @@
  * This class is useful for block report. Rather than send block reports
  * as a Block[] we can send it as a long[].
  *
+ * The structure of the array is as follows:
+ * 0: the length of the finalized replica list;
+ * 1: the length of the under-construction replica list;
+ * - followed by finalized replica list where each replica is represented by
+ *   3 longs: one for the blockId, one for the block length, and one for
+ *   the generation stamp;
+ * - followed by the invalid replica represented with three -1s;
+ * - followed by the under-construction replica list where each replica is
+ *   represented by 4 longs: three for the block id, length, generation 
+ *   stamp, and the forth for the replica state.
  */
-public class BlockListAsLongs implements Iterable<Block>{
+public class BlockListAsLongs implements Iterable<Block> {
   /**
-   * A block as 3 longs
+   * A finalized block as 3 longs
    *   block-id and block length and generation stamp
    */
-  private static final int LONGS_PER_BLOCK = 3;
-  
-  private static int index2BlockId(int index) {
-    return index*LONGS_PER_BLOCK;
-  }
-  private static int index2BlockLen(int index) {
-    return (index*LONGS_PER_BLOCK) + 1;
-  }
-  private static int index2BlockGenStamp(int index) {
-    return (index*LONGS_PER_BLOCK) + 2;
+  private static final int LONGS_PER_FINALIZED_BLOCK = 3;
+
+  /**
+   * An under-construction block as 4 longs
+   *   block-id and block length, generation stamp and replica state
+   */
+  private static final int LONGS_PER_UC_BLOCK = 4;
+
+  /** Number of longs in the header */
+  private static final int HEADER_SIZE = 2;
+
+  /**
+   * Returns the index of the first long in blockList
+   * belonging to the specified block.
+   * The first long contains the block id.
+   */
+  private int index2BlockId(int blockIndex) {
+    if(blockIndex < 0 || blockIndex > getNumberOfBlocks())
+      return -1;
+    int finalizedSize = getNumberOfFinalizedReplicas();
+    if(blockIndex < finalizedSize)
+      return HEADER_SIZE + blockIndex * LONGS_PER_FINALIZED_BLOCK;
+    return HEADER_SIZE + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+            + (blockIndex - finalizedSize) * LONGS_PER_UC_BLOCK;
   }
-  
+
   private long[] blockList;
   
   /**
-   * Converting a block[] to a long[]
-   * @param blockArray - the input array block[]
-   * @return the output array of long[]
+   * Create block report from finalized and under construction lists of blocks.
+   * 
+   * @param finalized - list of finalized blocks
+   * @param uc - list of under construction blocks
    */
-  public static long[] convertToArrayLongs(final Block[] blockArray) {
-    long[] blocksAsLongs = new long[blockArray.length * LONGS_PER_BLOCK];
+  public BlockListAsLongs(final List<? extends Block> finalized,
+                          final List<ReplicaInfo> uc) {
+    int finalizedSize = finalized == null ? 0 : finalized.size();
+    int ucSize = uc == null ? 0 : uc.size();
+    int len = HEADER_SIZE
+              + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+              + ucSize * LONGS_PER_UC_BLOCK;
+
+    blockList = new long[len];
+
+    // set the header
+    blockList[0] = finalizedSize;
+    blockList[1] = ucSize;
+
+    // set finalized blocks
+    for (int i = 0; i < finalizedSize; i++) {
+      setBlock(i, finalized.get(i));
+    }
 
-    BlockListAsLongs bl = new BlockListAsLongs(blocksAsLongs);
-    assert bl.getNumberOfBlocks() == blockArray.length;
+    // set invalid delimiting block
+    setDelimitingBlock(finalizedSize);
 
-    for (int i = 0; i < blockArray.length; i++) {
-      bl.setBlock(i, blockArray[i]);
+    // set under construction blocks
+    for (int i = 0; i < ucSize; i++) {
+      setBlock(finalizedSize + i, uc.get(i));
     }
-    return blocksAsLongs;
   }
 
   public BlockListAsLongs() {
@@ -72,27 +117,29 @@
    */
   public BlockListAsLongs(final long[] iBlockList) {
     if (iBlockList == null) {
-      blockList = new long[0];
-    } else {
-      if (iBlockList.length%LONGS_PER_BLOCK != 0) {
-        // must be multiple of LONGS_PER_BLOCK
-        throw new IllegalArgumentException();
-      }
-      blockList = iBlockList;
+      blockList = new long[HEADER_SIZE];
+      return;
     }
+    blockList = iBlockList;
+  }
+
+  public long[] getBlockListAsLongs() {
+    return blockList;
   }
 
   /**
    * Iterates over blocks in the block report.
    * Avoids object allocation on each iteration.
    */
-  private class BlockReportIterator implements Iterator<Block> {
+  public class BlockReportIterator implements Iterator<Block> {
     private int currentBlockIndex;
     private Block block;
+    private ReplicaState currentReplicaState;
 
     BlockReportIterator() {
       this.currentBlockIndex = 0;
       this.block = new Block();
+      this.currentReplicaState = null;
     }
 
     public boolean hasNext() {
@@ -100,22 +147,39 @@
     }
 
     public Block next() {
-      block.set(blockList[index2BlockId(currentBlockIndex)],
-                blockList[index2BlockLen(currentBlockIndex)],
-                blockList[index2BlockGenStamp(currentBlockIndex)]);
+      block.set(blockId(currentBlockIndex),
+                blockLength(currentBlockIndex),
+                blockGenerationStamp(currentBlockIndex));
+      currentReplicaState = blockReplicaState(currentBlockIndex);
       currentBlockIndex++;
       return block;
     }
 
-    public void remove()  {
+    public void remove() {
       throw new UnsupportedOperationException("Sorry. can't remove.");
     }
+
+    /**
+     * Get the state of the current replica.
+     * The state corresponds to the replica returned
+     * by the latest {@link #next()}. 
+     */
+    public ReplicaState getCurrentReplicaState() {
+      return currentReplicaState;
+    }
   }
 
   /**
    * Returns an iterator over blocks in the block report. 
    */
   public Iterator<Block> iterator() {
+    return getBlockReportIterator();
+  }
+
+  /**
+   * Returns {@link BlockReportIterator}. 
+   */
+  public BlockReportIterator getBlockReportIterator() {
     return new BlockReportIterator();
   }
 
@@ -124,7 +188,55 @@
    * @return - the number of blocks
    */
   public int getNumberOfBlocks() {
-    return blockList.length/LONGS_PER_BLOCK;
+    assert blockList.length == HEADER_SIZE + 
+            (blockList[0] + 1) * LONGS_PER_FINALIZED_BLOCK +
+            blockList[1] * LONGS_PER_UC_BLOCK :
+              "Number of blocks is inconcistent with the array length";
+    return getNumberOfFinalizedReplicas() + getNumberOfUCReplicas();
+  }
+
+  /**
+   * Returns the number of finalized replicas in the block report.
+   */
+  private int getNumberOfFinalizedReplicas() {
+    return (int)blockList[0];
+  }
+
+  /**
+   * Returns the number of under construction replicas in the block report.
+   */
+  private int getNumberOfUCReplicas() {
+    return (int)blockList[1];
+  }
+
+  /**
+   * Returns the id of the specified replica of the block report.
+   */
+  private long blockId(int index) {
+    return blockList[index2BlockId(index)];
+  }
+
+  /**
+   * Returns the length of the specified replica of the block report.
+   */
+  private long blockLength(int index) {
+    return blockList[index2BlockId(index) + 1];
+  }
+
+  /**
+   * Returns the generation stamp of the specified replica of the block report.
+   */
+  private long blockGenerationStamp(int index) {
+    return blockList[index2BlockId(index) + 2];
+  }
+
+  /**
+   * Returns the state of the specified replica of the block report.
+   */
+  private ReplicaState blockReplicaState(int index) {
+    if(index < getNumberOfFinalizedReplicas())
+      return ReplicaState.FINALIZED;
+    return ReplicaState.getState((int)blockList[index2BlockId(index) + 3]);
   }
 
   /**
@@ -134,7 +246,7 @@
    */
   @Deprecated
   public long getBlockId(final int index)  {
-    return blockList[index2BlockId(index)];
+    return blockId(index);
   }
   
   /**
@@ -144,7 +256,7 @@
    */
   @Deprecated
   public long getBlockLen(final int index)  {
-    return blockList[index2BlockLen(index)];
+    return blockLength(index);
   }
 
   /**
@@ -154,7 +266,7 @@
    */
   @Deprecated
   public long getBlockGenStamp(final int index)  {
-    return blockList[index2BlockGenStamp(index)];
+    return blockGenerationStamp(index);
   }
   
   /**
@@ -162,9 +274,28 @@
    * @param index - the index of the block to set
    * @param b - the block is set to the value of the this block
    */
-  private void setBlock(final int index, final Block b) {
-    blockList[index2BlockId(index)] = b.getBlockId();
-    blockList[index2BlockLen(index)] = b.getNumBytes();
-    blockList[index2BlockGenStamp(index)] = b.getGenerationStamp();
+  private <T extends Block> void setBlock(final int index, final T b) {
+    int pos = index2BlockId(index);
+    blockList[pos] = b.getBlockId();
+    blockList[pos + 1] = b.getNumBytes();
+    blockList[pos + 2] = b.getGenerationStamp();
+    if(index < getNumberOfFinalizedReplicas())
+      return;
+    assert ((ReplicaInfo)b).getState() != ReplicaState.FINALIZED :
+      "Must be under-construction replica.";
+    blockList[pos + 3] = ((ReplicaInfo)b).getState().getValue();
+  }
+
+  /**
+   * Set the invalid delimiting block between the finalized and
+   * the under-construction lists.
+   * The invalid block has all three fields set to -1.
+   * @param finalizedSzie - the size of the finalized list
+   */
+  private void setDelimitingBlock(final int finalizedSzie) {
+    int idx = HEADER_SIZE + finalizedSzie * LONGS_PER_FINALIZED_BLOCK;
+    blockList[idx] = -1;
+    blockList[idx+1] = -1;
+    blockList[idx+2] = -1;
   }
 }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Sun Sep  6 02:02:32 2009
@@ -86,15 +86,29 @@
    */
   static public enum ReplicaState {
     /** Replica is finalized. The state when replica is not modified. */
-    FINALIZED,
+    FINALIZED(0),
     /** Replica is being written to. */
-    RBW,
+    RBW(1),
     /** Replica is waiting to be recovered. */
-    RWR,
+    RWR(2),
     /** Replica is under recovery. */
-    RUR,
+    RUR(3),
     /** Temporary replica: created for replication and relocation only. */
-    TEMPORARY;
+    TEMPORARY(4);
+
+    private int value;
+
+    private ReplicaState(int v) {
+      value = v;
+    }
+
+    public int getValue() {
+      return value;
+    }
+
+    public static ReplicaState getState(int v) {
+      return ReplicaState.values()[v];
+    }
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Sun Sep  6 02:02:32 2009
@@ -29,11 +29,11 @@
 import java.io.PrintStream;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 import java.util.TreeSet;
 import java.util.regex.Matcher;
@@ -211,8 +211,8 @@
   private void init() {
     
     // get the list of blocks and arrange them in random order
-    Block arr[] = dataset.getBlockReport();
-    Collections.shuffle(Arrays.asList(arr));
+    List<Block> arr = dataset.getFinalizedBlocks();
+    Collections.shuffle(arr);
     
     blockInfoSet = new TreeSet<BlockScanInfo>();
     blockMap = new HashMap<Block, BlockScanInfo>();

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sun Sep  6 02:02:32 2009
@@ -989,13 +989,12 @@
       // and can be safely GC'ed.
       //
       long brStartTime = now();
-      Block[] bReport = data.getBlockReport();
+      BlockListAsLongs bReport = data.getBlockReport();
 
-      cmd = namenode.blockReport(dnRegistration,
-              BlockListAsLongs.convertToArrayLongs(bReport));
+      cmd = namenode.blockReport(dnRegistration, bReport.getBlockListAsLongs());
       long brTime = now() - brStartTime;
       myMetrics.blockReports.inc(brTime);
-      LOG.info("BlockReport of " + bReport.length +
+      LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
           " blocks got processed in " + brTime + " msecs");
       //
       // If we have sent the first block report, then wait a random

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sun Sep  6 02:02:32 2009
@@ -41,6 +41,7 @@
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -1335,20 +1336,34 @@
   }
 
   /**
-   * Return finalized blocks from the in-memory blockmap
+   * Generates a block report from the in-memory block map.
    */
-  public Block[] getBlockReport() {
-    ArrayList<Block> list =  new ArrayList<Block>(volumeMap.size());
+  public BlockListAsLongs getBlockReport() {
+    ArrayList<ReplicaInfo> finalized =
+      new ArrayList<ReplicaInfo>(volumeMap.size());
+    ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
     synchronized(this) {
       for (ReplicaInfo b : volumeMap.replicas()) {
-        if (b.getState() == ReplicaState.FINALIZED ) {
-          list.add(new Block(b));
-        } else if (supportAppends && b.getState() == ReplicaState.RWR) {
-          list.add(new Block(b));
+        switch(b.getState()) {
+        case FINALIZED:
+          finalized.add(b);
+          break;
+        case RBW:
+        case RWR:
+          uc.add(b);
+          break;
+        case RUR:
+          ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
+          uc.add(rur.getOriginalReplica());
+          break;
+        case TEMPORARY:
+          break;
+        default:
+          assert false : "Illegal ReplicaInfo state.";
         }
       }
+      return new BlockListAsLongs(finalized, uc);
     }
-    return list.toArray(new Block[list.size()]);
   }
 
   /**
@@ -1368,6 +1383,19 @@
   }
 
   /**
+   * Get the list of finalized blocks from in-memory blockmap.
+   */
+  synchronized List<Block> getFinalizedBlocks() {
+    ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size());
+    for (ReplicaInfo b : volumeMap.replicas()) {
+      if(b.getState() == ReplicaState.FINALIZED) {
+        finalized.add(new Block(b));
+      }
+    }
+    return finalized;
+  }
+
+  /**
    * Check whether the given block is a valid one.
    * valid means finalized
    */

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Sun Sep  6 02:02:32 2009
@@ -29,6 +29,7 @@
 
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
@@ -225,7 +226,7 @@
    * Returns the block report - the full list of blocks stored
    * @return - the block report - the full list of blocks stored
    */
-  public Block[] getBlockReport();
+  public BlockListAsLongs getBlockReport();
 
   /**
    * Is the block valid?

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java Sun Sep  6 02:02:32 2009
@@ -54,7 +54,7 @@
   }
   
   @Override  // ReplicaInfo
-  ReplicaState getState() {
+  public ReplicaState getState() {
     return ReplicaState.FINALIZED;
   }
   

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java Sun Sep  6 02:02:32 2009
@@ -72,7 +72,7 @@
   }
 
   @Override   //ReplicaInfo
-  ReplicaState getState() {
+  public ReplicaState getState() {
     return ReplicaState.RBW;
   }
   

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Sun Sep  6 02:02:32 2009
@@ -92,7 +92,7 @@
   }
   
   @Override  //ReplicaInfo
-  ReplicaState getState() {
+  public ReplicaState getState() {
     return ReplicaState.TEMPORARY;
   }
   

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Sun Sep  6 02:02:32 2009
@@ -33,7 +33,7 @@
  * This class is used by datanodes to maintain meta data of its replicas.
  * It provides a general interface for meta information of a replica.
  */
-abstract class ReplicaInfo extends Block {
+abstract public class ReplicaInfo extends Block {
   private FSVolume volume;      // volume where the replica belongs
   private File     dir;         // directory where block & meta files belong
 
@@ -134,7 +134,7 @@
    * Get the replica state
    * @return the replica state
    */
-  abstract ReplicaState getState();
+  abstract public ReplicaState getState();
   
   /**
    * check if this replica has already detached.

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java Sun Sep  6 02:02:32 2009
@@ -96,7 +96,7 @@
   }
   
   @Override //ReplicaInfo
-  ReplicaState getState() {
+  public ReplicaState getState() {
     return ReplicaState.RUR;
   }
   

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java Sun Sep  6 02:02:32 2009
@@ -60,7 +60,7 @@
   }
   
   @Override //ReplicaInfo
-  ReplicaState getState() {
+  public ReplicaState getState() {
     return ReplicaState.RWR;
   }
   

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java Sun Sep  6 02:02:32 2009
@@ -32,17 +32,10 @@
   private BlockUCState blockUCState;
 
   /**
-   * Block replica locations as assigned when the block was allocated.
+   * Block replicas as assigned when the block was allocated.
    * This defines the pipeline order.
-   * It is not guaranteed that data-nodes actually have corresponding replicas.
    */
-  private DatanodeDescriptor[] locations;
-
-  /**
-   * Block replica states.
-   * Replica at locations[i] has state replicaStates[i].
-   */
-  private ReplicaState[] replicaStates;
+  private ReplicaUnderConstruction[] replicas;
 
   /** A data-node responsible for block recovery. */
   private int primaryNodeIndex = -1;
@@ -51,6 +44,62 @@
   private long lastRecoveryTime = 0;
 
   /**
+   * ReplicaUnderConstruction contains information about replicas while
+   * they are under construction.
+   * The GS, the length and the state of the replica is as reported by 
+   * the data-node.
+   * It is not guaranteed, but expected, that data-nodes actually have
+   * corresponding replicas.
+   */
+  static class ReplicaUnderConstruction extends Block {
+    private DatanodeDescriptor expectedLocation;
+    private ReplicaState state;
+
+    ReplicaUnderConstruction(Block block,
+                             DatanodeDescriptor target,
+                             ReplicaState state) {
+      super(block);
+      this.expectedLocation = target;
+      this.state = state;
+    }
+
+    /**
+     * Expected block replica location as assigned when the block was allocated.
+     * This defines the pipeline order.
+     * It is not guaranteed, but expected, that the data-node actually has
+     * the replica.
+     */
+    DatanodeDescriptor getExpectedLocation() {
+      return expectedLocation;
+    }
+
+    /**
+     * Get replica state as reported by the data-node.
+     */
+    ReplicaState getState() {
+      return state;
+    }
+
+    /**
+     * Is data-node the replica belongs to alive.
+     */
+    boolean isAlive() {
+      return expectedLocation.isAlive;
+    }
+
+    @Override // Block
+    public int hashCode() {
+      return super.hashCode();
+    }
+
+    @Override // Block
+    public boolean equals(Object obj) {
+      // Sufficient to rely on super's implementation
+      return (this == obj) || super.equals(obj);
+    }
+  }
+
+  /**
    * Create block and set its state to
    * {@link BlockUCState#UNDER_CONSTRUCTION}.
    */
@@ -87,19 +136,27 @@
   }
 
   void setLocations(DatanodeDescriptor[] targets) {
-    this.locations = targets;
     int numLocations = targets == null ? 0 : targets.length;
-    replicaStates = new ReplicaState[numLocations];
+    this.replicas = new ReplicaUnderConstruction[numLocations];
     for(int i = 0; i < numLocations; i++)
-      replicaStates[i] = ReplicaState.RBW;
+      replicas[i] =
+        new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW);
   }
 
-  DatanodeDescriptor[] getLocations() {
+  /**
+   * Create array of expected replica locations
+   * (as has been assigned by chooseTargets()).
+   */
+  private DatanodeDescriptor[] getExpectedLocations() {
+    int numLocations = replicas == null ? 0 : replicas.length;
+    DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations];
+    for(int i = 0; i < numLocations; i++)
+      locations[i] = replicas[i].getExpectedLocation();
     return locations;
   }
 
   int getNumLocations() {
-    return locations == null ? 0 : locations.length;
+    return replicas == null ? 0 : replicas.length;
   }
 
   /**
@@ -134,18 +191,19 @@
    * Find the first alive data-node starting from the previous primary.
    */
   void assignPrimaryDatanode() {
-    if (locations.length == 0) {
+    if (replicas.length == 0) {
       NameNode.stateChangeLog.warn("BLOCK*"
         + " INodeFileUnderConstruction.initLeaseRecovery:"
         + " No blocks found, lease removed.");
     }
 
     int previous = primaryNodeIndex;
-    for(int i = 1; i <= locations.length; i++) {
-      int j = (previous + i)%locations.length;
-      if (locations[j].isAlive) {
-        DatanodeDescriptor primary = locations[primaryNodeIndex = j]; 
-        primary.addBlockToBeRecovered(this, locations);
+    for(int i = 1; i <= replicas.length; i++) {
+      int j = (previous + i)%replicas.length;
+      if (replicas[j].isAlive()) {
+        primaryNodeIndex = j;
+        DatanodeDescriptor primary = replicas[j].getExpectedLocation(); 
+        primary.addBlockToBeRecovered(this, getExpectedLocations());
         NameNode.stateChangeLog.info("BLOCK* " + this
           + " recovery started, primary=" + primary);
         return;

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Sun Sep  6 02:02:32 2009
@@ -25,6 +25,8 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.io.Text;
@@ -391,13 +393,28 @@
     // scan the report and collect newly reported blocks
     // Note we are taking special precaution to limit tmp blocks allocated
     // as part this block report - which why block list is stored as longs
-    for (Block iblk : newReport) {
+    BlockReportIterator itBR = newReport.getBlockReportIterator();
+    Block iblk = null;
+    ReplicaState iState;
+    while(itBR.hasNext()) {
+      iblk = itBR.next();
+      iState = itBR.getCurrentReplicaState();
       BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
       if(storedBlock == null) {
         // If block is not in blocksMap it does not belong to any file
         toInvalidate.add(new Block(iblk));
         continue;
       }
+      switch(iState) {
+      case FINALIZED:
+      case RWR:
+        break;
+      case RBW: // ignore these replicas for now to provide
+      case RUR: // compatibility with current block report processing
+      case TEMPORARY:
+      default:
+        continue;
+      }
       if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
         // if the size differs from what is in the blockmap, then return
         // the new block. addStoredBlock will then pick up the right size of this

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Sun Sep  6 02:02:32 2009
@@ -35,10 +35,9 @@
  **********************************************************************/
 public interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 20: SendHeartbeat may return KeyUpdateCommand
-   *     Register returns access keys inside DatanodeRegistration object
+   * 21: blockReport() includes under-construction replicas.
    */
-  public static final long versionID = 20L;
+  public static final long versionID = 21L;
   
   // error code
   final static int NOTIFY = 0;

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java Sun Sep  6 02:02:32 2009
@@ -21,6 +21,7 @@
 import java.net.UnknownHostException;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
+import java.util.Arrays;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -200,7 +201,7 @@
           }
           for (int i = 1; i <= replication; ++i) { 
             // inject blocks for dn_i into dn_i and replica in dn_i's neighbors 
-            mc.injectBlocks((i_dn + i- 1)% numDataNodes, blocks);
+            mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks));
             System.out.println("Injecting blocks of dn " + i_dn  + " into dn" + 
                 ((i_dn + i- 1)% numDataNodes));
           }

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Sun Sep  6 02:02:32 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -855,7 +856,7 @@
    * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
    * @return the block report for the specified data node
    */
-  public Block[] getBlockReport(int dataNodeIndex) {
+  public Iterable<Block> getBlockReport(int dataNodeIndex) {
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
@@ -866,11 +867,11 @@
   /**
    * 
    * @return block reports from all data nodes
-   *    Block[] is indexed in the same order as the list of datanodes returned by getDataNodes()
+   *    BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes()
    */
-  public Block[][] getAllBlockReports() {
+  public Iterable<Block>[] getAllBlockReports() {
     int numDataNodes = dataNodes.size();
-    Block[][] result = new Block[numDataNodes][];
+    Iterable<Block>[] result = new BlockListAsLongs[numDataNodes];
     for (int i = 0; i < numDataNodes; ++i) {
      result[i] = getBlockReport(i);
     }
@@ -887,7 +888,7 @@
    *             if any of blocks already exist in the data node
    *   
    */
-  public void injectBlocks(int dataNodeIndex, Block[] blocksToInject) throws IOException {
+  public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException {
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
@@ -909,7 +910,7 @@
    *             if any of blocks already exist in the data nodes
    *             Note the rest of the blocks are not injected.
    */
-  public void injectBlocks(Block[][] blocksToInject) throws IOException {
+  public void injectBlocks(Iterable<Block>[] blocksToInject) throws IOException {
     if (blocksToInject.length >  dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java Sun Sep  6 02:02:32 2009
@@ -102,15 +102,16 @@
     // mock with newly created blocks
     // I can't use DFSTestUtil.getAllBlocks(fs.open(filePath)) because it
     // will keep the file open which will prevent the effect of the test
-    Block[] blocks = locatedToBlocks(cluster.getNameNode().getBlockLocations(
+    ArrayList<Block> blocks = 
+      locatedToBlocks(cluster.getNameNode().getBlockLocations(
         filePath.toString(), FILE_START,
         AppendTestUtil.FILE_SIZE).getLocatedBlocks(), null);
 
-    LOG.info("Number of blocks allocated " + blocks.length);
-    int[] newLengths = new int[blocks.length];
+    LOG.info("Number of blocks allocated " + blocks.size());
+    int[] newLengths = new int[blocks.size()];
     int tempLen;
-    for (int i = 0; i < blocks.length; i++) {
-      Block b = blocks[i];
+    for (int i = 0; i < blocks.size(); i++) {
+      Block b = blocks.get(i);
       LOG.debug("Block " + b.getBlockName() + " before\t" + "Size " +
           b.getNumBytes());
       LOG.debug("Setting new length");
@@ -122,7 +123,7 @@
     }
     cluster.getNameNode().blockReport(
         cluster.listDataNodes()[DN_N0].dnRegistration,
-        BlockListAsLongs.convertToArrayLongs(blocks));
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     List<LocatedBlock> blocksAfterReport =
         DFSTestUtil.getAllBlocks(fs.open(filePath));
@@ -174,7 +175,7 @@
     for (Integer aRemovedIndex : removedIndex) {
       blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock());
     }
-    Block[] blocks = locatedToBlocks(lBlocks, removedIndex);
+    ArrayList<Block> blocks = locatedToBlocks(lBlocks, removedIndex);
 
     LOG.debug("Number of blocks allocated " + lBlocks.size());
 
@@ -195,7 +196,7 @@
 
     cluster.getNameNode().blockReport(
         cluster.listDataNodes()[DN_N0].dnRegistration,
-        BlockListAsLongs.convertToArrayLongs(blocks));
+        new BlockListAsLongs(blocks, null).getBlockListAsLongs());
 
     cluster.getNamesystem().computeDatanodeWork();
 
@@ -217,8 +218,8 @@
         blocks2Remove.size(), cluster.getNamesystem().getUnderReplicatedBlocks());
   }
 
-  private Block[] locatedToBlocks(final List<LocatedBlock> locatedBlks,
-                                  List<Integer> positionsToRemove) {
+  private ArrayList<Block> locatedToBlocks(final List<LocatedBlock> locatedBlks,
+                                           List<Integer> positionsToRemove) {
     int substructLen = 0;
     if (positionsToRemove != null) { // Need to allocated smaller array
       substructLen = positionsToRemove.size();
@@ -232,7 +233,7 @@
       }
       newList.add(locatedBlks.get(i).getBlock());
     }
-    return newList.toArray(ret);
+    return newList;
   }
 
   private List<File> findAllFiles(File top, FilenameFilter mask) {

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java Sun Sep  6 02:02:32 2009
@@ -1094,7 +1094,7 @@
   static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
     List<File> files = new ArrayList<File>();
     List<DataNode> datanodes = cluster.getDataNodes();
-    Block[][] blocks = cluster.getAllBlockReports();
+    Iterable<Block>[] blocks = cluster.getAllBlockReports();
     for(int i = 0; i < blocks.length; i++) {
       FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
       for(Block b : blocks[i]) {

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java Sun Sep  6 02:02:32 2009
@@ -151,7 +151,7 @@
       waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, 20);
 
       
-      Block[][] blocksList = cluster.getAllBlockReports();
+      Iterable<Block>[] blocksList = cluster.getAllBlockReports();
                     
       
       cluster.shutdown();
@@ -174,15 +174,14 @@
       cluster.waitActive();
       Set<Block> uniqueBlocks = new HashSet<Block>();
       for (int i=0; i<blocksList.length; ++i) {
-        for (int j=0; j < blocksList[i].length; ++j) {
-          uniqueBlocks.add(blocksList[i][j]);
+        for (Block b : blocksList[i]) {
+          uniqueBlocks.add(new Block(b));
         }
       }
       // Insert all the blocks in the first data node
       
       LOG.info("Inserting " + uniqueBlocks.size() + " blocks");
-      Block[] blocks = uniqueBlocks.toArray(new Block[uniqueBlocks.size()]);
-      cluster.injectBlocks(0, blocks);
+      cluster.injectBlocks(0, uniqueBlocks);
       
       dfsClient = new DFSClient(new InetSocketAddress("localhost",
                                   cluster.getNameNodePort()),

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Sun Sep  6 02:02:32 2009
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 
@@ -169,7 +170,8 @@
     cluster.waitActive();
     client = DFSClient.createNamenode(conf);
 
-    cluster.injectBlocks(blocksDN);
+    for(int i = 0; i < blocksDN.length; i++)
+      cluster.injectBlocks(i, Arrays.asList(blocksDN[i]));
 
     long totalCapacity = 0L;
     for(long capacity:capacities) {

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sun Sep  6 02:02:32 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Random;
@@ -31,6 +32,7 @@
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.metrics.util.MBeanUtil;
@@ -303,10 +305,12 @@
     blockMap = new HashMap<Block,BInfo>(); 
   }
 
-  public synchronized void injectBlocks(Block[] injectBlocks)
+  public synchronized void injectBlocks(Iterable<Block> injectBlocks)
                                             throws IOException {
     if (injectBlocks != null) {
+      int numInjectedBlocks = 0;
       for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
+        numInjectedBlocks++;
         if (b == null) {
           throw new NullPointerException("Null blocks in block list");
         }
@@ -315,12 +319,12 @@
         }
       }
       HashMap<Block, BInfo> oldBlockMap = blockMap;
-      blockMap = 
-          new HashMap<Block,BInfo>(injectBlocks.length + oldBlockMap.size());
+      blockMap = new HashMap<Block,BInfo>(
+          numInjectedBlocks + oldBlockMap.size());
       blockMap.putAll(oldBlockMap);
       for (Block b: injectBlocks) {
           BInfo binfo = new BInfo(b, false);
-          blockMap.put(b, binfo);
+          blockMap.put(binfo.theBlock, binfo);
       }
     }
   }
@@ -340,7 +344,7 @@
     }
   }
 
-  public synchronized Block[] getBlockReport() {
+  public synchronized BlockListAsLongs getBlockReport() {
     Block[] blockTable = new Block[blockMap.size()];
     int count = 0;
     for (BInfo b : blockMap.values()) {
@@ -351,7 +355,8 @@
     if (count != blockTable.length) {
       blockTable = Arrays.copyOf(blockTable, count);
     }
-    return blockTable;
+    return new BlockListAsLongs(
+        new ArrayList<Block>(Arrays.asList(blockTable)), null);
   }
 
   public long getCapacity() throws IOException {
@@ -463,7 +468,7 @@
       return binfo;
     }
     binfo = new BInfo(b, true);
-    blockMap.put(b, binfo);
+    blockMap.put(binfo.theBlock, binfo);
     return binfo;
   }
 
@@ -479,7 +484,7 @@
             " is being written, and cannot be written to.");
     }
     BInfo binfo = new BInfo(b, true);
-    blockMap.put(b, binfo);
+    blockMap.put(binfo.theBlock, binfo);
     return binfo;
   }
 

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Sun Sep  6 02:02:32 2009
@@ -31,13 +31,11 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -116,8 +114,8 @@
     
     // make sure a block report is sent 
     DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
-    cluster.getNameNode().blockReport(dn.dnRegistration,
-        BlockListAsLongs.convertToArrayLongs(cluster.getBlockReport(1)));
+    long[] bReport = dn.getFSDataset().getBlockReport().getBlockListAsLongs();
+    cluster.getNameNode().blockReport(dn.dnRegistration, bReport);
 
     // verify number of blocks and files...
     verify(filename, filesize);

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Sun Sep  6 02:02:32 2009
@@ -25,6 +25,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
@@ -145,24 +146,24 @@
 
 
   public void testGetBlockReport() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
-    Block[] blockReport = fsdataset.getBlockReport();
-    assertEquals(0, blockReport.length);
+    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
+    BlockListAsLongs blockReport = fsdataset.getBlockReport();
+    assertEquals(0, blockReport.getNumberOfBlocks());
     int bytesAdded = addSomeBlocks(fsdataset);
     blockReport = fsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
     }
   }
   public void testInjectionEmpty() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
-    Block[] blockReport = fsdataset.getBlockReport();
-    assertEquals(0, blockReport.length);
+    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
+    BlockListAsLongs blockReport = fsdataset.getBlockReport();
+    assertEquals(0, blockReport.getNumberOfBlocks());
     int bytesAdded = addSomeBlocks(fsdataset);
     blockReport = fsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
@@ -175,7 +176,7 @@
     SimulatedFSDataset sfsdataset = new SimulatedFSDataset(conf);
     sfsdataset.injectBlocks(blockReport);
     blockReport = sfsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
@@ -186,13 +187,13 @@
   }
 
   public void testInjectionNonEmpty() throws IOException {
-    FSDatasetInterface fsdataset = new SimulatedFSDataset(conf); 
+    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
     
-    Block[] blockReport = fsdataset.getBlockReport();
-    assertEquals(0, blockReport.length);
+    BlockListAsLongs blockReport = fsdataset.getBlockReport();
+    assertEquals(0, blockReport.getNumberOfBlocks());
     int bytesAdded = addSomeBlocks(fsdataset);
     blockReport = fsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
@@ -207,13 +208,13 @@
     // Add come blocks whose block ids do not conflict with
     // the ones we are going to inject.
     bytesAdded += addSomeBlocks(sfsdataset, NUMBLOCKS+1);
-    Block[] blockReport2 = sfsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    BlockListAsLongs blockReport2 = sfsdataset.getBlockReport();
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     blockReport2 = sfsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS, blockReport.length);
+    assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
     sfsdataset.injectBlocks(blockReport);
     blockReport = sfsdataset.getBlockReport();
-    assertEquals(NUMBLOCKS*2, blockReport.length);
+    assertEquals(NUMBLOCKS*2, blockReport.getNumberOfBlocks());
     for (Block b: blockReport) {
       assertNotNull(b);
       assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=811746&r1=811745&r2=811746&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Sun Sep  6 02:02:32 2009
@@ -686,8 +686,9 @@
     
     NamespaceInfo nsInfo;
     DatanodeRegistration dnRegistration;
-    Block[] blocks;
+    ArrayList<Block> blocks;
     int nrBlocks; // actual number of blocks
+    long[] blockReportList;
 
     /**
      * Get data-node in the form 
@@ -706,7 +707,7 @@
 
     TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
       dnRegistration = new DatanodeRegistration(getNodeName(dnIdx));
-      this.blocks = new Block[blockCapacity];
+      this.blocks = new ArrayList<Block>(blockCapacity);
       this.nrBlocks = 0;
     }
 
@@ -739,19 +740,24 @@
     }
 
     boolean addBlock(Block blk) {
-      if(nrBlocks == blocks.length) {
-        LOG.debug("Cannot add block: datanode capacity = " + blocks.length);
+      if(nrBlocks == blocks.size()) {
+        LOG.debug("Cannot add block: datanode capacity = " + blocks.size());
         return false;
       }
-      blocks[nrBlocks] = blk;
+      blocks.set(nrBlocks, blk);
       nrBlocks++;
       return true;
     }
 
     void formBlockReport() {
       // fill remaining slots with blocks that do not exist
-      for(int idx = blocks.length-1; idx >= nrBlocks; idx--)
-        blocks[idx] = new Block(blocks.length - idx, 0, 0);
+      for(int idx = blocks.size()-1; idx >= nrBlocks; idx--)
+        blocks.set(idx, new Block(blocks.size() - idx, 0, 0));
+      blockReportList = new BlockListAsLongs(blocks,null).getBlockListAsLongs();
+    }
+
+    long[] getBlockReportList() {
+      return blockReportList;
     }
 
     public int compareTo(String name) {
@@ -761,6 +767,7 @@
     /**
      * Send a heartbeat to the name-node and replicate blocks if requested.
      */
+    @SuppressWarnings("unused") // keep it for future blockReceived benchmark
     int replicateBlocks() throws IOException {
       // register datanode
       DatanodeCommand[] cmds = nameNode.sendHeartbeat(
@@ -928,8 +935,7 @@
       assert daemonId < numThreads : "Wrong daemonId.";
       TinyDatanode dn = datanodes[daemonId];
       long start = System.currentTimeMillis();
-      nameNode.blockReport(dn.dnRegistration,
-          BlockListAsLongs.convertToArrayLongs(dn.blocks));
+      nameNode.blockReport(dn.dnRegistration, dn.getBlockReportList());
       long end = System.currentTimeMillis();
       return end-start;
     }



Mime
View raw message