hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r820497 [4/7] - in /hadoop/hdfs/trunk: ./ .eclipse.templates/.launches/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apach...
Date Wed, 30 Sep 2009 23:39:33 GMT
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
+import org.apache.hadoop.io.IOUtils;
+
+/** 
+ * This class defines a replica in a pipeline, which
+ * includes a persistent replica being written to by a dfs client or
+ * a temporary replica being replicated by a source datanode or
+ * being copied for the balancing purpose.
+ * 
+ * The base class implements a temporary replica
+ */
+class ReplicaInPipeline extends ReplicaInfo
+                        implements ReplicaInPipelineInterface {
+  private long bytesAcked;
+  private long bytesOnDisk;
+  private Thread writer;
+  
+  /**
+   * Constructor for a zero length replica
+   * @param blockId block id
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param state replica state
+   */
+    ReplicaInPipeline(long blockId, long genStamp, 
+        FSVolume vol, File dir) {
+    this( blockId, 0L, genStamp, vol, dir, Thread.currentThread());
+  }
+
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param writer a thread that is writing to this replica
+   */
+  ReplicaInPipeline(Block block, 
+      FSVolume vol, File dir, Thread writer) {
+    this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
+        vol, dir, writer);
+  }
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param writer a thread that is writing to this replica
+   */
+  ReplicaInPipeline(long blockId, long len, long genStamp,
+      FSVolume vol, File dir, Thread writer ) {
+    super( blockId, len, genStamp, vol, dir);
+    this.bytesAcked = len;
+    this.bytesOnDisk = len;
+    this.writer = writer;
+  }
+
+  @Override
+  public long getVisibleLength() {
+    return -1;
+  }
+  
+  @Override  //ReplicaInfo
+  public ReplicaState getState() {
+    return ReplicaState.TEMPORARY;
+  }
+  
+  @Override // ReplicaInPipelineInterface
+  public long getBytesAcked() {
+    return bytesAcked;
+  }
+  
+  @Override // ReplicaInPipelineInterface
+  public void setBytesAcked(long bytesAcked) {
+    this.bytesAcked = bytesAcked;
+  }
+  
+  @Override // ReplicaInPipelineInterface
+  public long getBytesOnDisk() {
+    return bytesOnDisk;
+  }
+  
+  @Override //ReplicaInPipelineInterface
+  public void setBytesOnDisk(long bytesOnDisk) {
+    this.bytesOnDisk = bytesOnDisk;
+  }
+  
+  /**
+   * Set the thread that is writing to this replica
+   * @param writer a thread writing to this replica
+   */
+  public void setWriter(Thread writer) {
+    this.writer = writer;
+  }
+  
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  /**
+   * Interrupt the writing thread and wait until it dies
+   * @throws IOException the waiting is interrupted
+   */
+  void stopWriter() throws IOException {
+    if (writer != null && writer != Thread.currentThread() && writer.isAlive()) {
+      writer.interrupt();
+      try {
+        writer.join();
+      } catch (InterruptedException e) {
+        throw new IOException("Waiting for writer thread is interrupted.");
+      }
+    }
+  }
+  
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+  
+  @Override // ReplicaInPipelineInterface
+  public BlockWriteStreams createStreams() throws IOException {
+    File blockFile = getBlockFile();
+    File metaFile = getMetaFile();
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("writeTo blockfile is " + blockFile +
+                         " of size " + blockFile.length());
+      DataNode.LOG.debug("writeTo metafile is " + metaFile +
+                         " of size " + metaFile.length());
+    }
+    FileOutputStream blockOut = null;
+    FileOutputStream crcOut = null;
+    try {
+      blockOut = new FileOutputStream(
+          new RandomAccessFile( blockFile, "rw" ).getFD() );
+      crcOut = new FileOutputStream(
+          new RandomAccessFile( metaFile, "rw" ).getFD() );
+      return new BlockWriteStreams(blockOut, crcOut);
+    } catch (IOException e) {
+      IOUtils.closeStream(blockOut);
+      IOUtils.closeStream(crcOut);
+      throw e;
+    }
+  }
+  
+  @Override
+  public String toString() {
+    return super.toString()
+        + "\n  bytesAcked=" + bytesAcked
+        + "\n  bytesOnDisk=" + bytesOnDisk;
+  }
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
+
+/** 
+ * This defines the interface of a replica in Pipeline that's being written to
+ */
+interface ReplicaInPipelineInterface extends Replica {
+  /**
+   * Set the number of bytes received
+   * @param bytesReceived number of bytes received
+   */
+  void setNumBytes(long bytesReceived);
+  
+  /**
+   * Get the number of bytes acked
+   * @return the number of bytes acked
+   */
+  long getBytesAcked();
+  
+  /**
+   * Set the number bytes that have acked
+   * @param bytesAcked
+   */
+  void setBytesAcked(long bytesAcked);
+  
+  /**
+   * Set the number of bytes on disk
+   * @param bytesOnDisk number of bytes on disk
+   */
+  void setBytesOnDisk(long bytesOnDisk);
+  
+  /**
+   * Create output streams for writing to this replica, 
+   * one for block file and one for CRC file
+   * 
+   * @return output streams for writing
+   * @throws IOException if any error occurs
+   */
+  public BlockWriteStreams createStreams() throws IOException;
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Wed Sep 30 23:39:30 2009
@@ -22,69 +22,137 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileUtil.HardLink;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.io.IOUtils;
 
 /**
- * This class is used by the datanode to maintain the map from a block 
- * to its metadata.
+ * This class is used by datanodes to maintain meta data of its replicas.
+ * It provides a general interface for meta information of a replica.
  */
-class ReplicaInfo {
-
-  private FSVolume volume;       // volume where the block belongs
-  private File     file;         // block file
-  private boolean detached;      // copy-on-write done for block
+abstract public class ReplicaInfo extends Block implements Replica {
+  private FSVolume volume;      // volume where the replica belongs
+  private File     dir;         // directory where block & meta files belong
 
-  ReplicaInfo(FSVolume vol, File file) {
-    this.volume = vol;
-    this.file = file;
-    detached = false;
+  /**
+   * Constructor for a zero length replica
+   * @param blockId block id
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(long blockId, long genStamp, FSVolume vol, File dir) {
+    this( blockId, 0L, genStamp, vol, dir);
   }
-
-  ReplicaInfo(FSVolume vol) {
+  
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(Block block, FSVolume vol, File dir) {
+    this(block.getBlockId(), block.getNumBytes(), 
+        block.getGenerationStamp(), vol, dir);
+  }
+  
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(long blockId, long len, long genStamp,
+      FSVolume vol, File dir) {
+    super(blockId, len, genStamp);
     this.volume = vol;
-    this.file = null;
-    detached = false;
+    this.dir = dir;
   }
 
+  /**
+   * Get this replica's meta file name
+   * @return this replica's meta file name
+   */
+  private String getMetaFileName() {
+    return getBlockName() + "_" + getGenerationStamp() + METADATA_EXTENSION; 
+  }
+  
+  /**
+   * Get the full path of this replica's data file
+   * @return the full path of this replica's data file
+   */
+  File getBlockFile() {
+    return new File(getDir(), getBlockName());
+  }
+  
+  /**
+   * Get the full path of this replica's meta file
+   * @return the full path of this replica's meta file
+   */
+  File getMetaFile() {
+    return new File(getDir(), getMetaFileName());
+  }
+  
+  /**
+   * Get the volume where this replica is located on disk
+   * @return the volume where this replica is located on disk
+   */
   FSVolume getVolume() {
     return volume;
   }
-
-  File getFile() {
-    return file;
+  
+  /**
+   * Set the volume where this replica is located on disk
+   */
+  void setVolume(FSVolume vol) {
+    this.volume = vol;
   }
-
-  void setFile(File f) {
-    file = f;
+  
+  /**
+   * Return the parent directory path where this replica is located
+   * @return the parent directory path where this replica is located
+   */
+  File getDir() {
+    return dir;
   }
 
   /**
-   * Is this block already detached?
+   * Set the parent directory where this replica is located
+   * @param dir the parent directory where the replica is located
    */
-  boolean isDetached() {
-    return detached;
+  void setDir(File dir) {
+    this.dir = dir;
   }
 
   /**
-   *  Block has been successfully detached
+   * check if this replica has already been unlinked.
+   * @return true if the replica has already been unlinked 
+   *         or no need to be detached; false otherwise
    */
-  void setDetached() {
-    detached = true;
+  boolean isUnlinked() {
+    return true;                // no need to be unlinked
   }
 
   /**
+   * set that this replica is unlinked
+   */
+  void setUnlinked() {
+    // no need to be unlinked
+  }
+  
+   /**
    * Copy specified file into a temporary file. Then rename the
    * temporary file to the original name. This will cause any
    * hardlinks to the original file to be removed. The temporary
-   * files are created in the detachDir. The temporary files will
+   * files are created in the same directory. The temporary files will
    * be recovered (especially on Windows) on datanode restart.
    */
-  private void detachFile(File file, Block b) throws IOException {
-    File tmpFile = volume.createDetachFile(b, file.getName());
+  private void unlinkFile(File file, Block b) throws IOException {
+    File tmpFile = FSDataset.createTmpFile(b, FSDataset.getUnlinkTmpFile(file));
     try {
       FileInputStream in = new FileInputStream(file);
       try {
@@ -114,33 +182,60 @@
   }
 
   /**
-   * Returns true if this block was copied, otherwise returns false.
+   * Remove a hard link by copying the block to a temporary place and 
+   * then moving it back
+   * @param numLinks number of hard links
+   * @return true if copy is successful; 
+   *         false if it is already detached or no need to be detached
+   * @throws IOException if there is any copy error
    */
-  boolean detachBlock(Block block, int numLinks) throws IOException {
-    if (isDetached()) {
+  boolean unlinkBlock(int numLinks) throws IOException {
+    if (isUnlinked()) {
       return false;
     }
-    if (file == null || volume == null) {
-      throw new IOException("detachBlock:Block not found. " + block);
+    File file = getBlockFile();
+    if (file == null || getVolume() == null) {
+      throw new IOException("detachBlock:Block not found. " + this);
     }
-    File meta = FSDataset.getMetaFile(file, block);
+    File meta = getMetaFile();
     if (meta == null) {
-      throw new IOException("Meta file not found for block " + block);
+      throw new IOException("Meta file not found for block " + this);
     }
 
     if (HardLink.getLinkCount(file) > numLinks) {
-      DataNode.LOG.info("CopyOnWrite for block " + block);
-      detachFile(file, block);
+      DataNode.LOG.info("CopyOnWrite for block " + this);
+      unlinkFile(file, this);
     }
     if (HardLink.getLinkCount(meta) > numLinks) {
-      detachFile(meta, block);
+      unlinkFile(meta, this);
     }
-    setDetached();
+    setUnlinked();
     return true;
   }
+
+  /**
+   * Set this replica's generation stamp to be a newer one
+   * @param newGS new generation stamp
+   * @throws IOException is the new generation stamp is not greater than the current one
+   */
+  void setNewerGenerationStamp(long newGS) throws IOException {
+    long curGS = getGenerationStamp();
+    if (newGS <= curGS) {
+      throw new IOException("New generation stamp (" + newGS 
+          + ") must be greater than current one (" + curGS + ")");
+    }
+    setGenerationStamp(newGS);
+  }
   
+  @Override  //Object
   public String toString() {
-    return getClass().getSimpleName() + "(volume=" + volume
-        + ", file=" + file + ", detached=" + detached + ")";
+    return getClass().getSimpleName()
+        + ", " + super.toString()
+        + ", " + getState()
+        + "\n  getNumBytes()     = " + getNumBytes()
+        + "\n  getBytesOnDisk()  = " + getBytesOnDisk()
+        + "\n  getVisibleLength()= " + getVisibleLength()
+        + "\n  getVolume()       = " + getVolume()
+        + "\n  getBlockFile()    = " + getBlockFile();
   }
 }

Propchange: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 23:39:30 2009
@@ -1,2 +1,6 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:713112
+/hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:713112
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:776175-785643,785929-786278
+/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
+/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Exception indicating that DataNode does not have a replica
+ * that matches the target block.  
+ */
+class ReplicaNotFoundException extends IOException {
+  private static final long serialVersionUID = 1L;
+  final static String NON_RBW_REPLICA = "Cannot recover a non-RBW replica ";
+  final static String UNFINALIZED_REPLICA = 
+    "Cannot append to an unfinalized replica ";
+  final static String UNFINALIZED_AND_NONRBW_REPLICA = 
+    "Cannot recover append/close to a replica that's not FINALIZED and not RBW ";
+  final static String NON_EXISTENT_REPLICA =
+    "Cannot append to a non-existent replica ";
+  final static String UNEXPECTED_GS_REPLICA =
+    "Cannot append to a replica with unexpeted generation stamp ";
+
+  public ReplicaNotFoundException() {
+    super();
+  }
+
+  ReplicaNotFoundException(Block b) {
+    super("Replica not found for " + b);
+  }
+  
+  public ReplicaNotFoundException(String msg) {
+    super(msg);
+  }
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+
+/**
+ * This class represents replicas that are under block recovery
+ * It has a recovery id that is equal to the generation stamp 
+ * that the replica will be bumped to after recovery
+ * The recovery id is used to handle multiple concurrent block recoveries.
+ * A recovery with higher recovery id preempts recoveries with a lower id.
+ *
+ */
+class ReplicaUnderRecovery extends ReplicaInfo {
+  private ReplicaInfo original; // the original replica that needs to be recovered
+  private long recoveryId; // recovery id; it is also the generation stamp 
+                           // that the replica will be bumped to after recovery
+
+  ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
+    super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
+        replica.getVolume(), replica.getDir());
+    if ( replica.getState() != ReplicaState.FINALIZED &&
+         replica.getState() != ReplicaState.RBW &&
+         replica.getState() != ReplicaState.RWR ) {
+      throw new IllegalArgumentException("Cannot recover replica: " + replica);
+    }
+    this.original = replica;
+    this.recoveryId = recoveryId;
+  }
+
+  /** 
+   * Get the recovery id
+   * @return the generation stamp that the replica will be bumped to 
+   */
+  long getRecoveryID() {
+    return recoveryId;
+  }
+
+  /** 
+   * Set the recovery id
+   * @param recoveryId the new recoveryId
+   */
+  void setRecoveryID(long recoveryId) {
+    if (recoveryId > this.recoveryId) {
+      this.recoveryId = recoveryId;
+    } else {
+      throw new IllegalArgumentException("The new rcovery id: " + recoveryId
+          + " must be greater than the current one: " + this.recoveryId);
+    }
+  }
+
+  /**
+   * Get the original replica that's under recovery
+   * @return the original replica under recovery
+   */
+  ReplicaInfo getOriginalReplica() {
+    return original;
+  }
+  
+  /**
+   * Get the original replica's state
+   * @return the original replica's state
+   */
+  ReplicaState getOrignalReplicaState() {
+    return original.getState();
+  }
+
+  @Override //ReplicaInfo
+  boolean isUnlinked() {
+    return original.isUnlinked();
+  }
+
+  @Override //ReplicaInfo
+  void setUnlinked() {
+    original.setUnlinked();
+  }
+  
+  @Override //ReplicaInfo
+  public ReplicaState getState() {
+    return ReplicaState.RUR;
+  }
+  
+  @Override
+  public long getVisibleLength() {
+    return original.getVisibleLength();
+  }
+
+  @Override
+  public long getBytesOnDisk() {
+    return original.getBytesOnDisk();
+  }
+
+  @Override  //org.apache.hadoop.hdfs.protocol.Block
+  public void setBlockId(long blockId) {
+    super.setBlockId(blockId);
+    original.setBlockId(blockId);
+  }
+
+  @Override //org.apache.hadoop.hdfs.protocol.Block
+  public void setGenerationStamp(long gs) {
+    super.setGenerationStamp(gs);
+    original.setGenerationStamp(gs);
+  }
+  
+  @Override //org.apache.hadoop.hdfs.protocol.Block
+  public void setNumBytes(long numBytes) {
+    super.setNumBytes(numBytes);
+    original.setNumBytes(numBytes);
+  }
+  
+  @Override //ReplicaInfo
+  void setDir(File dir) {
+    super.setDir(dir);
+    original.setDir(dir);
+  }
+  
+  @Override //ReplicaInfo
+  void setVolume(FSVolume vol) {
+    super.setVolume(vol);
+    original.setVolume(vol);
+  }
+  
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return super.toString()
+        + "\n  recoveryId=" + recoveryId
+        + "\n  original=" + original;
+  }
+
+  ReplicaRecoveryInfo createInfo() {
+    return new ReplicaRecoveryInfo(this, getOrignalReplicaState()); 
+  }
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * This class represents a replica that is waiting to be recovered.
+ * After a datanode restart, any replica in "rbw" directory is loaded
+ * as a replica waiting to be recovered.
+ * A replica waiting to be recovered does not provision read nor
+ * participates in any pipeline recovery. It will become outdated if its
+ * client continues to write or be recovered as a result of
+ * lease recovery.
+ */
+class ReplicaWaitingToBeRecovered extends ReplicaInfo {
+  private boolean unlinked;      // copy-on-write done for block
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaWaitingToBeRecovered(long blockId, long len, long genStamp,
+      FSVolume vol, File dir) {
+    super(blockId, len, genStamp, vol, dir);
+  }
+  
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaWaitingToBeRecovered(Block block, FSVolume vol, File dir) {
+    super(block, vol, dir);
+  }
+  
+  @Override //ReplicaInfo
+  public ReplicaState getState() {
+    return ReplicaState.RWR;
+  }
+  
+  @Override //ReplicaInfo
+  boolean isUnlinked() {
+    return unlinked;
+  }
+
+  @Override //ReplicaInfo
+  void setUnlinked() {
+    unlinked = true;
+  }
+  
+  @Override //ReplicaInfo
+  public long getVisibleLength() {
+    return -1;  //no bytes are visible
+  }
+  
+  @Override
+  public long getBytesOnDisk() {
+    return getNumBytes();
+  }
+
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return super.toString()
+        + "\n  unlinked=" + unlinked;
+  }
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+class ReplicasMap {
+  // HashMap: maps a block id to the replica's meta info
+  private HashMap<Long, ReplicaInfo> map = new HashMap<Long, ReplicaInfo>();
+  /**
+   * Get the meta information of the replica that matches both block id 
+   * and generation stamp
+   * @param block block with its id as the key
+   * @return the replica's meta information
+   * @throws IllegalArgumentException if the input block is null
+   */
+  ReplicaInfo get(Block block) {
+    if (block == null) {
+      throw new IllegalArgumentException("Do not expect null block");
+    }
+    ReplicaInfo replicaInfo = get(block.getBlockId());
+    if (replicaInfo != null && 
+        block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
+      return replicaInfo;
+    }
+    return null;
+  }
+  
+  /**
+   * Get the meta information of the replica that matches the block id
+   * @param blockId a block's id
+   * @return the replica's meta information
+   */
+  ReplicaInfo get(long blockId) {
+    return map.get(blockId);
+  }
+  
+  /**
+   * Add a replica's meta information into the map 
+   * 
+   * @param replicaInfo a replica's meta information
+   * @return previous meta information of the replica
+   * @throws IllegalArgumentException if the input parameter is null
+   */
+  ReplicaInfo add(ReplicaInfo replicaInfo) {
+    if (replicaInfo == null) {
+      throw new IllegalArgumentException("Do not expect null block");
+    }
+    return  map.put(replicaInfo.getBlockId(), replicaInfo);
+  }
+  
+  /**
+   * Remove the replica's meta information from the map that matches
+   * the input block's id and generation stamp
+   * @param block block with its id as the key
+   * @return the removed replica's meta information
+   * @throws IllegalArgumentException if the input block is null
+   */
+  ReplicaInfo remove(Block block) {
+    if (block == null) {
+      throw new IllegalArgumentException("Do not expect null block");
+    }
+    Long key = Long.valueOf(block.getBlockId());
+    ReplicaInfo replicaInfo = map.get(key);
+    if (replicaInfo != null &&
+        replicaInfo.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
+      return remove(key);
+    } 
+    
+    return null;
+  }
+  
+  /**
+   * Remove the replica's meta information from the map if present
+   * @param the block id of the replica to be removed
+   * @return the removed replica's meta information
+   */
+  ReplicaInfo remove(long blockId) {
+    return map.remove(blockId);
+  }
+ 
+  /**
+   * Get the size of the map
+   * @return the number of replicas in the map
+   */
+  int size() {
+    return map.size();
+  }
+  
+  /**
+   * Get a collection of the replicas
+   * @return a collection of the replicas
+   */
+  Collection<ReplicaInfo> replicas() {
+    return map.values();
+  }
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java Wed Sep 30 23:39:30 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 
 /**
  * Internal class for block metadata.
@@ -35,12 +36,22 @@
    */
   private Object[] triplets;
 
-  BlockInfo(Block blk, int replication) {
+  protected BlockInfo(Block blk, int replication) {
     super(blk);
     this.triplets = new Object[3*replication];
     this.inode = null;
   }
 
+  /**
+   * Copy construction.
+   * This is used to convert BlockInfoUnderConstruction
+   * @param from BlockInfo to copy from.
+   */
+  protected BlockInfo(BlockInfo from) {
+    this(from, from.inode.getReplication());
+    this.inode = from.inode;
+  }
+
   INodeFile getINode() {
     return inode;
   }
@@ -64,7 +75,7 @@
     assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
     BlockInfo info = (BlockInfo)triplets[index*3+1];
     assert info == null || 
-        BlockInfo.class.getName().equals(info.getClass().getName()) : 
+        info.getClass().getName().startsWith(BlockInfo.class.getName()) : 
               "BlockInfo is expected at " + index*3;
     return info;
   }
@@ -74,7 +85,7 @@
     assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
     BlockInfo info = (BlockInfo)triplets[index*3+2];
     assert info == null || 
-        BlockInfo.class.getName().equals(info.getClass().getName()) : 
+        info.getClass().getName().startsWith(BlockInfo.class.getName()) : 
               "BlockInfo is expected at " + index*3;
     return info;
   }
@@ -262,6 +273,43 @@
     return true;
   }
 
+  /**
+   * BlockInfo represents a block that is not being constructed.
+   * In order to start modifying the block, the BlockInfo should be converted
+   * to {@link BlockInfoUnderConstruction}.
+   * @return {@link BlockUCState#COMPLETE}
+   */
+  BlockUCState getBlockUCState() {
+    return BlockUCState.COMPLETE;
+  }
+
+  /**
+   * Is this block complete?
+   * 
+   * @return true if the state of the block is {@link BlockUCState#COMPLETE}
+   */
+  boolean isComplete() {
+    return getBlockUCState().equals(BlockUCState.COMPLETE);
+  }
+
+  /**
+   * Convert a complete block to an under construction block.
+   * 
+   * @return BlockInfoUnderConstruction -  an under construction block.
+   */
+  BlockInfoUnderConstruction convertToBlockUnderConstruction(
+      BlockUCState s, DatanodeDescriptor[] targets) {
+    if(isComplete()) {
+      return new BlockInfoUnderConstruction(
+          this, getINode().getReplication(), s, targets);
+    }
+    // the block is already under construction
+    BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;
+    ucBlock.setBlockUCState(s);
+    ucBlock.setExpectedLocations(targets);
+    return ucBlock;
+  }
+
   @Override
   public int hashCode() {
     // Super implementation is sufficient

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+
+/**
+ * Represents a block that is currently being constructed.<br>
+ * This is usually the last block of a file opened for write or append.
+ */
+class BlockInfoUnderConstruction extends BlockInfo {
+  /** Block state. See {@link BlockUCState} */
+  private BlockUCState blockUCState;
+
+  /**
+   * Block replicas as assigned when the block was allocated.
+   * This defines the pipeline order.
+   */
+  private List<ReplicaUnderConstruction> replicas;
+
+  /** A data-node responsible for block recovery. */
+  private int primaryNodeIndex = -1;
+
+  /**
+   * The new generation stamp, which this block will have
+   * after the recovery succeeds. Also used as a recovery id to identify
+   * the right recovery if any of the abandoned recoveries re-appear.
+   */
+  private long blockRecoveryId = 0;
+
+  /**
+   * ReplicaUnderConstruction contains information about replicas while
+   * they are under construction.
+   * The GS, the length and the state of the replica is as reported by 
+   * the data-node.
+   * It is not guaranteed, but expected, that data-nodes actually have
+   * corresponding replicas.
+   */
+  static class ReplicaUnderConstruction extends Block {
+    private DatanodeDescriptor expectedLocation;
+    private ReplicaState state;
+
+    ReplicaUnderConstruction(Block block,
+                             DatanodeDescriptor target,
+                             ReplicaState state) {
+      super(block);
+      this.expectedLocation = target;
+      this.state = state;
+    }
+
+    /**
+     * Expected block replica location as assigned when the block was allocated.
+     * This defines the pipeline order.
+     * It is not guaranteed, but expected, that the data-node actually has
+     * the replica.
+     */
+    DatanodeDescriptor getExpectedLocation() {
+      return expectedLocation;
+    }
+
+    /**
+     * Get replica state as reported by the data-node.
+     */
+    ReplicaState getState() {
+      return state;
+    }
+
+    /**
+     * Set replica state.
+     */
+    void setState(ReplicaState s) {
+      state = s;
+    }
+
+    /**
+     * Is data-node the replica belongs to alive.
+     */
+    boolean isAlive() {
+      return expectedLocation.isAlive;
+    }
+
+    @Override // Block
+    public int hashCode() {
+      return super.hashCode();
+    }
+
+    @Override // Block
+    public boolean equals(Object obj) {
+      // Sufficient to rely on super's implementation
+      return (this == obj) || super.equals(obj);
+    }
+  }
+
+  /**
+   * Create block and set its state to
+   * {@link BlockUCState#UNDER_CONSTRUCTION}.
+   */
+  BlockInfoUnderConstruction(Block blk, int replication) {
+    this(blk, replication, BlockUCState.UNDER_CONSTRUCTION, null);
+  }
+
+  BlockInfoUnderConstruction(Block blk, int replication,
+                             BlockUCState state,
+                             DatanodeDescriptor[] targets) {
+    super(blk, replication);
+    assert getBlockUCState() != BlockUCState.COMPLETE :
+      "BlockInfoUnderConstruction cannot be in COMPLETE state";
+    this.blockUCState = state;
+    setExpectedLocations(targets);
+  }
+
+  /**
+   * Convert an under construction block to a complete block.
+   * 
+   * @return BlockInfo - a complete block.
+   * @throws IOException if the state of the block 
+   * (the generation stamp and the length) has not been committed by 
+   * the client or it does not have at least a minimal number of replicas 
+   * reported from data-nodes. 
+   */
+  BlockInfo convertToCompleteBlock() throws IOException {
+    assert getBlockUCState() != BlockUCState.COMPLETE :
+      "Trying to convert a COMPLETE block";
+    if(getBlockUCState() != BlockUCState.COMMITTED)
+      throw new IOException(
+          "Cannot complete block: block has not been COMMITTED by the client");
+    return new BlockInfo(this);
+  }
+
+  void setExpectedLocations(DatanodeDescriptor[] targets) {
+    int numLocations = targets == null ? 0 : targets.length;
+    this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
+    for(int i = 0; i < numLocations; i++)
+      replicas.add(
+        new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
+  }
+
+  /**
+   * Create array of expected replica locations
+   * (as has been assigned by chooseTargets()).
+   */
+  DatanodeDescriptor[] getExpectedLocations() {
+    int numLocations = replicas == null ? 0 : replicas.size();
+    DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations];
+    for(int i = 0; i < numLocations; i++)
+      locations[i] = replicas.get(i).getExpectedLocation();
+    return locations;
+  }
+
+  int getNumExpectedLocations() {
+    return replicas == null ? 0 : replicas.size();
+  }
+
+  /**
+   * Return the state of the block under construction.
+   * @see BlockUCState
+   */
+  @Override // BlockInfo
+  BlockUCState getBlockUCState() {
+    return blockUCState;
+  }
+
+  void setBlockUCState(BlockUCState s) {
+    blockUCState = s;
+  }
+
+  long getBlockRecoveryId() {
+    return blockRecoveryId;
+  }
+
+  /**
+   * Commit block's length and generation stamp as reported by the client.
+   * Set block state to {@link BlockUCState#COMMITTED}.
+   * @param block - contains client reported block length and generation 
+   * @throws IOException if block ids are inconsistent.
+   */
+  void commitBlock(Block block) throws IOException {
+    if(getBlockId() != block.getBlockId())
+      throw new IOException("Trying to commit inconsistent block: id = "
+          + block.getBlockId() + ", expected id = " + getBlockId());
+    blockUCState = BlockUCState.COMMITTED;
+    this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+  }
+
+  /**
+   * Initialize lease recovery for this block.
+   * Find the first alive data-node starting from the previous primary and
+   * make it primary.
+   */
+  void initializeBlockRecovery(long recoveryId) {
+    setBlockUCState(BlockUCState.UNDER_RECOVERY);
+    blockRecoveryId = recoveryId;
+    if (replicas.size() == 0) {
+      NameNode.stateChangeLog.warn("BLOCK*"
+        + " INodeFileUnderConstruction.initLeaseRecovery:"
+        + " No blocks found, lease removed.");
+    }
+
+    int previous = primaryNodeIndex;
+    for(int i = 1; i <= replicas.size(); i++) {
+      int j = (previous + i)%replicas.size();
+      if (replicas.get(j).isAlive()) {
+        primaryNodeIndex = j;
+        DatanodeDescriptor primary = replicas.get(j).getExpectedLocation(); 
+        primary.addBlockToBeRecovered(this);
+        NameNode.stateChangeLog.info("BLOCK* " + this
+          + " recovery started, primary=" + primary);
+        return;
+      }
+    }
+  }
+
+  void addReplicaIfNotPresent(DatanodeDescriptor dn,
+                     Block block,
+                     ReplicaState rState) {
+    for(ReplicaUnderConstruction r : replicas)
+      if(r.getExpectedLocation() == dn)
+        return;
+    replicas.add(new ReplicaUnderConstruction(block, dn, rState));
+  }
+
+  @Override // BlockInfo
+  // BlockInfoUnderConstruction participates in maps the same way as BlockInfo
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override // BlockInfo
+  public boolean equals(Object obj) {
+    // Sufficient to rely on super's implementation
+    return (this == obj) || super.equals(obj);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(getClass().getSimpleName());
+    b.append("{")
+     .append("\n  blockUCState=").append(blockUCState)
+     .append("\n  replicas=").append(replicas)
+     .append("\n  primaryNodeIndex=").append(primaryNodeIndex)
+     .append("}");
+    return b.toString();
+  }
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Wed Sep 30 23:39:30 2009
@@ -22,7 +22,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -37,9 +36,11 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
-import org.apache.hadoop.security.AccessTokenHandler;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -237,6 +238,80 @@
   }
 
   /**
+   * Commit the last block of the file.
+   * 
+   * @param fileINode file inode
+   * @param commitBlock - contains client reported block length and generation
+   * @throws IOException if the block does not have at least a minimal number
+   * of replicas reported from data-nodes.
+   */
+  void commitLastBlock(INodeFileUnderConstruction fileINode, 
+                       Block commitBlock) throws IOException {
+    if(commitBlock == null)
+      return; // not committing, this is a block allocation retry
+    BlockInfo lastBlock = fileINode.getLastBlock();
+    if(lastBlock == null)
+      return; // no blocks in file yet
+    if(lastBlock.isComplete())
+      return; // already completed (e.g. by syncBlock)
+    assert lastBlock.getNumBytes() <= commitBlock.getNumBytes() :
+      "commitBlock length is less than the stored one "
+      + commitBlock.getNumBytes() + " vs. " + lastBlock.getNumBytes();
+    ((BlockInfoUnderConstruction)lastBlock).commitBlock(commitBlock);
+  }
+
+  /**
+   * Convert a specified block of the file to a complete block.
+   * @param fileINode file
+   * @param blkIndex  block index in the file
+   * @throws IOException if the block does not have at least a minimal number
+   * of replicas reported from data-nodes.
+   */
+  BlockInfo completeBlock(INodeFile fileINode, int blkIndex)
+  throws IOException {
+    if(blkIndex < 0)
+      return null;
+    BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
+    if(curBlock.isComplete())
+      return curBlock;
+    BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
+    if(ucBlock.numNodes() < minReplication)
+      throw new IOException("Cannot complete block: " +
+          "block does not satisfy minimal replication requirement.");
+    BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
+    // replace penultimate block in file
+    fileINode.setBlock(blkIndex, completeBlock);
+    // replace block in the blocksMap
+    return blocksMap.replaceBlock(completeBlock);
+  }
+
+  BlockInfo completeBlock(INodeFile fileINode, BlockInfo block)
+  throws IOException {
+    BlockInfo[] fileBlocks = fileINode.getBlocks();
+    for(int idx = 0; idx < fileBlocks.length; idx++)
+      if(fileBlocks[idx] == block) {
+        return completeBlock(fileINode, idx);
+      }
+    return block;
+  }
+
+  /**
+   * Convert the last block of the file to an under construction block.
+   * @param fileINode file
+   * @param targets data-nodes that will form the pipeline for this block
+   */
+  void convertLastBlockToUnderConstruction(
+      INodeFileUnderConstruction fileINode,
+      DatanodeDescriptor[] targets) throws IOException {
+    BlockInfo oldBlock = fileINode.getLastBlock();
+    if(oldBlock == null)
+      return;
+    BlockInfoUnderConstruction ucBlock =
+      fileINode.setLastBlock(oldBlock, targets);
+    blocksMap.replaceBlock(ucBlock);
+  }
+
+  /**
    * Get all valid locations of the block
    */
   ArrayList<String> getValidLocations(Block block) {
@@ -254,7 +329,7 @@
     return machineSet;
   }
 
-  List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
+  List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
       long length, int nrBlocksToReturn) throws IOException {
     int curBlk = 0;
     long curPos = 0, blkSize = 0;
@@ -269,43 +344,12 @@
     }
 
     if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return null;
+      return Collections.<LocatedBlock>emptyList();
 
     long endOff = offset + length;
     List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
     do {
-      // get block locations
-      int numNodes = blocksMap.numNodes(blocks[curBlk]);
-      int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
-      int numCorruptReplicas = corruptReplicas
-          .numCorruptReplicas(blocks[curBlk]);
-      if (numCorruptNodes != numCorruptReplicas) {
-        FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
-            + blocks[curBlk] + "blockMap has " + numCorruptNodes
-            + " but corrupt replicas map has " + numCorruptReplicas);
-      }
-      boolean blockCorrupt = (numCorruptNodes == numNodes);
-      int numMachineSet = blockCorrupt ? numNodes :
-                          (numNodes - numCorruptNodes);
-      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
-      if (numMachineSet > 0) {
-        numNodes = 0;
-        for (Iterator<DatanodeDescriptor> it = 
-             blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
-          DatanodeDescriptor dn = it.next();
-          boolean replicaCorrupt = 
-            corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
-          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
-            machineSet[numNodes++] = dn;
-        }
-      }
-      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
-          blockCorrupt);
-      if (namesystem.isAccessTokenEnabled) {
-        b.setAccessToken(namesystem.accessTokenHandler.generateToken(b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
-      }
-      results.add(b);
+      results.add(getBlockLocation(blocks[curBlk], curPos));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
@@ -314,6 +358,41 @@
     return results;
   }
 
+  /** @return a LocatedBlock for the given block */
+  LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
+      ) throws IOException {
+    if (!blk.isComplete()) {
+      final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
+      final DatanodeDescriptor[] locations = uc.getExpectedLocations();
+      return namesystem.createLocatedBlock(uc, locations, pos, false);
+    }
+
+    // get block locations
+    final int numCorruptNodes = countNodes(blk).corruptReplicas();
+    final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
+    if (numCorruptNodes != numCorruptReplicas) {
+      FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
+          + blk + " blockMap has " + numCorruptNodes
+          + " but corrupt replicas map has " + numCorruptReplicas);
+    }
+
+    final int numNodes = blocksMap.numNodes(blk);
+    final boolean isCorrupt = numCorruptNodes == numNodes;
+    final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
+    final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+    if (numMachines > 0) {
+      int j = 0;
+      for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
+          it.hasNext();) {
+        final DatanodeDescriptor d = it.next();
+        final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
+        if (isCorrupt || (!isCorrupt && !replicaCorrupt))
+          machines[j++] = d;
+      }
+    }
+    return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);    
+  }
+
   /**
    * Check whether the replication parameter is within the range
    * determined by system configuration.
@@ -369,7 +448,7 @@
       pendingDeletionBlocksCount++;
       if (log) {
         NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
-            + b.getBlockName() + " to " + dn.getName());
+            + b + " to " + dn.getName());
       }
     }
   }
@@ -399,7 +478,7 @@
     }
     if (datanodes.length() != 0) {
       NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
-          + b.getBlockName() + " to " + datanodes.toString());
+          + b + " to " + datanodes.toString());
     }
   }
 
@@ -457,6 +536,10 @@
       addToInvalidates(storedBlock, node);
       return;
     } 
+
+    // Add replica to the data-node if it is not already there
+    node.addBlock(storedBlock);
+
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
     if (countNodes(storedBlock).liveReplicas() > inode.getReplication()) {
@@ -885,7 +968,8 @@
     Collection<Block> toAdd = new LinkedList<Block>();
     Collection<Block> toRemove = new LinkedList<Block>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
-    node.reportDiff(blocksMap, report, toAdd, toRemove, toInvalidate);
+    Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+    node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
 
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
@@ -899,6 +983,9 @@
           + " does not belong to any file.");
       addToInvalidates(b, node);
     }
+    for (BlockInfo b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
   }
 
   /**
@@ -908,7 +995,8 @@
    */
   private Block addStoredBlock(final Block block,
                                DatanodeDescriptor node,
-                               DatanodeDescriptor delNodeHint) {
+                               DatanodeDescriptor delNodeHint)
+  throws IOException {
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
@@ -1020,13 +1108,17 @@
     int numCurrentReplica = numLiveReplicas
       + pendingReplications.getNumReplicas(storedBlock);
 
+    if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
+        numLiveReplicas >= minReplication)
+      storedBlock = completeBlock(fileINode, storedBlock);
+
     // check whether safe replication is reached for the block
-    namesystem.incrementSafeBlockCount(numCurrentReplica);
+    // only complete blocks are counted towards that
+    if(storedBlock.isComplete())
+      namesystem.incrementSafeBlockCount(numCurrentReplica);
 
-    //
-    // if file is being actively written to, then do not check
-    // replication-factor here. It will be checked when the file is closed.
-    //
+    // if file is under construction, then check whether the block
+    // can be completed
     if (fileINode.isUnderConstruction()) {
       return storedBlock;
     }
@@ -1253,7 +1345,30 @@
     // Modify the blocks->datanode map and node's map.
     //
     pendingReplications.remove(block);
-    addStoredBlock(block, node, delHintNode);
+
+    // blockReceived reports a finalized block
+    Collection<Block> toAdd = new LinkedList<Block>();
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+    node.processReportedBlock(this, block, ReplicaState.FINALIZED,
+                              toAdd, toInvalidate, toCorrupt);
+    // the block is only in one of the lists
+    // if it is in none then data-node already has it
+    assert toAdd.size() + toInvalidate.size() <= 1 :
+      "The block should be only in one of the lists.";
+
+    for (Block b : toAdd) {
+      addStoredBlock(b, node, delHintNode);
+    }
+    for (Block b : toInvalidate) {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
+          + b + " on " + node.getName() + " size " + b.getNumBytes()
+          + " does not belong to any file.");
+      addToInvalidates(b, node);
+    }
+    for (BlockInfo b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
   }
 
   /**
@@ -1351,6 +1466,14 @@
     return blocksMap.getStoredBlock(block);
   }
 
+  /**
+   * Find the block by block ID.
+   */
+  BlockInfo findStoredBlock(long blockId) {
+    Block wildcardBlock = new Block(blockId, 0, GenerationStamp.WILDCARD_STAMP);
+    return blocksMap.getStoredBlock(wildcardBlock);
+  }
+
   /* updates a block in under replication queue */
   void updateNeededReplications(Block block, int curReplicasDelta,
       int expectedReplicasDelta) {
@@ -1522,7 +1645,7 @@
     return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);
   }
 
-  BlockInfo addINode(Block block, INodeFile iNode) {
+  BlockInfo addINode(BlockInfo block, INodeFile iNode) {
     return blocksMap.addINode(block, iNode);
   }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java Wed Sep 30 23:39:30 2009
@@ -75,11 +75,10 @@
   /**
    * Add block b belonging to the specified file inode to the map.
    */
-  BlockInfo addINode(Block b, INodeFile iNode) {
-    int replication = iNode.getReplication();
+  BlockInfo addINode(BlockInfo b, INodeFile iNode) {
     BlockInfo info = map.get(b);
-    if (info == null) {
-      info = new BlockInfo(b, replication);
+    if (info != b) {
+      info = b;
       map.put(info, info);
     }
     info.setINode(iNode);
@@ -191,4 +190,23 @@
   float getLoadFactor() {
     return loadFactor;
   }
+
+  /**
+   * Replace a block in the block map by a new block.
+   * The new block and the old one have the same key.
+   * @param newBlock - block for replacement
+   * @return new block
+   */
+  BlockInfo replaceBlock(BlockInfo newBlock) {
+    BlockInfo currentBlock = map.get(newBlock);
+    assert currentBlock != null : "the block if not in blocksMap";
+    // replace block in data-node lists
+    for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) {
+      DatanodeDescriptor dn = currentBlock.getDatanode(idx);
+      dn.replaceBlock(currentBlock, newBlock);
+    }
+    // replace block in the map itself
+    map.put(newBlock, newBlock);
+    return newBlock;
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Wed Sep 30 23:39:30 2009
@@ -25,7 +25,11 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
@@ -55,29 +59,36 @@
   }
 
   /** A BlockTargetPair queue. */
-  private static class BlockQueue {
-    private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+  private static class BlockQueue<E> {
+    private final Queue<E> blockq = new LinkedList<E>();
 
     /** Size of the queue */
     synchronized int size() {return blockq.size();}
 
     /** Enqueue */
-    synchronized boolean offer(Block block, DatanodeDescriptor[] targets) { 
-      return blockq.offer(new BlockTargetPair(block, targets));
+    synchronized boolean offer(E e) { 
+      return blockq.offer(e);
     }
 
     /** Dequeue */
-    synchronized List<BlockTargetPair> poll(int numBlocks) {
+    synchronized List<E> poll(int numBlocks) {
       if (numBlocks <= 0 || blockq.isEmpty()) {
         return null;
       }
 
-      List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+      List<E> results = new ArrayList<E>();
       for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
         results.add(blockq.poll());
       }
       return results;
     }
+
+    /**
+     * Returns <tt>true</tt> if the queue contains the specified element.
+     */
+    boolean contains(E e) {
+      return blockq.contains(e);
+    }
   }
 
   private volatile BlockInfo blockList = null;
@@ -87,9 +98,10 @@
   protected boolean needKeyUpdate = false;
 
   /** A queue of blocks to be replicated by this datanode */
-  private BlockQueue replicateBlocks = new BlockQueue();
+  private BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
   /** A queue of blocks to be recovered by this datanode */
-  private BlockQueue recoverBlocks = new BlockQueue();
+  private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
+                                new BlockQueue<BlockInfoUnderConstruction>();
   /** A set of blocks to be invalidated by this datanode */
   private Set<Block> invalidateBlocks = new TreeSet<Block>();
 
@@ -201,6 +213,21 @@
     blockList = b.listInsert(blockList, this);
   }
 
+  /**
+   * Replace specified old block with a new one in the DataNodeDescriptor.
+   * 
+   * @param oldBlock - block to be replaced
+   * @param newBlock - a replacement block
+   * @return the new block
+   */
+  BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
+    boolean done = removeBlock(oldBlock);
+    assert done : "Old block should belong to the data-node when replacing";
+    done = addBlock(newBlock);
+    assert done : "New block should not belong to the data-node when replacing";
+    return newBlock;
+  }
+
   void resetBlocks() {
     this.capacity = 0;
     this.remaining = 0;
@@ -262,15 +289,20 @@
    */
   void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
     assert(block != null && targets != null && targets.length > 0);
-    replicateBlocks.offer(block, targets);
+    replicateBlocks.offer(new BlockTargetPair(block, targets));
   }
 
   /**
    * Store block recovery work.
    */
-  void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
-    assert(block != null && targets != null && targets.length > 0);
-    recoverBlocks.offer(block, targets);
+  void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
+    if(recoverBlocks.contains(block)) {
+      // this prevents adding the same block twice to the recovery queue
+      FSNamesystem.LOG.info("Block " + block +
+                            " is already in the recovery queue.");
+      return;
+    }
+    recoverBlocks.offer(block);
   }
 
   /**
@@ -308,10 +340,16 @@
         new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
   }
 
-  BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
-    List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
-    return blocktargetlist == null? null:
-        new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
+  BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
+    List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
+    if(blocks == null)
+      return null;
+    BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size());
+    for(BlockInfoUnderConstruction b : blocks) {
+      brCommand.add(new RecoveringBlock(
+          b, b.getExpectedLocations(), b.getBlockRecoveryId()));
+    }
+    return brCommand;
   }
 
   /**
@@ -361,11 +399,12 @@
     return blockarray;
   }
 
-  void reportDiff(BlocksMap blocksMap,
+  void reportDiff(BlockManager blockManager,
                   BlockListAsLongs newReport,
                   Collection<Block> toAdd,    // add to DatanodeDescriptor
                   Collection<Block> toRemove, // remove from DatanodeDescriptor
-                  Collection<Block> toInvalidate) { // should be removed from DN
+                  Collection<Block> toInvalidate, // should be removed from DN
+                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -373,29 +412,16 @@
     assert added : "Delimiting block cannot be present in the node";
     if(newReport == null)
       newReport = new BlockListAsLongs();
-    // scan the report and collect newly reported blocks
-    // Note we are taking special precaution to limit tmp blocks allocated
-    // as part this block report - which why block list is stored as longs
-    for (Block iblk : newReport) {
-      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
-      if(storedBlock == null) {
-        // If block is not in blocksMap it does not belong to any file
-        toInvalidate.add(new Block(iblk));
-        continue;
-      }
-      if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
-        // if the size differs from what is in the blockmap, then return
-        // the new block. addStoredBlock will then pick up the right size of this
-        // block and will update the block object in the BlocksMap
-        if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
-          toAdd.add(new Block(iblk));
-        } else {
-          toAdd.add(storedBlock);
-        }
-        continue;
-      }
+    // scan the report and process newly reported blocks
+    BlockReportIterator itBR = newReport.getBlockReportIterator();
+    while(itBR.hasNext()) {
+      Block iblk = itBR.next();
+      ReplicaState iState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState,
+                                               toAdd, toInvalidate, toCorrupt);
       // move block to the head of the list
-      this.moveBlockToHead(storedBlock);
+      if(storedBlock != null && storedBlock.findDatanode(this) >= 0)
+        this.moveBlockToHead(storedBlock);
     }
     // collect blocks that have not been reported
     // all of them are next to the delimiter
@@ -405,6 +431,105 @@
     this.removeBlock(delimiter);
   }
 
+  /**
+   * Process a block replica reported by the data-node.
+   * 
+   * <ol>
+   * <li>If the block is not known to the system (not in blocksMap) then the
+   * data-node should be notified to invalidate this block.</li>
+   * <li>If the reported replica is valid that is has the same generation stamp
+   * and length as recorded on the name-node, then the replica location is
+   * added to the name-node.</li>
+   * <li>If the reported replica is not valid, then it is marked as corrupt,
+   * which triggers replication of the existing valid replicas.
+   * Corrupt replicas are removed from the system when the block
+   * is fully replicated.</li>
+   * </ol>
+   * 
+   * @param blockManager
+   * @param block reported block replica
+   * @param rState reported replica state
+   * @param toAdd add to DatanodeDescriptor
+   * @param toInvalidate missing blocks (not in the blocks map)
+   *        should be removed from the data-node
+   * @param toCorrupt replicas with unexpected length or generation stamp;
+   *        add to corrupt replicas
+   * @return
+   */
+  BlockInfo processReportedBlock(
+                  BlockManager blockManager,
+                  Block block,                // reported block replica
+                  ReplicaState rState,        // reported replica state
+                  Collection<Block> toAdd,    // add to DatanodeDescriptor
+                  Collection<Block> toInvalidate, // should be removed from DN
+                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
+    FSNamesystem.LOG.debug("Reported block " + block
+        + " on " + getName() + " size " + block.getNumBytes()
+        + " replicaState = " + rState);
+
+    // find block by blockId
+    BlockInfo storedBlock = blockManager.findStoredBlock(block.getBlockId());
+    if(storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the replica should be removed from the data-node.
+      toInvalidate.add(new Block(block));
+      return null;
+    }
+
+    FSNamesystem.LOG.debug("In memory blockUCState = " + storedBlock.getBlockUCState());
+
+    // Block is on the DN
+    boolean isCorrupt = false;
+    switch(rState) {
+    case FINALIZED:
+      switch(storedBlock.getBlockUCState()) {
+      case COMPLETE:
+      case COMMITTED:
+        // This is a temporary hack until Block.equals() and compareTo()
+        // are changed not to take into account the generation stamp for searching
+        // in  blocksMap
+        if(storedBlock.getGenerationStamp() != block.getGenerationStamp()) {
+          toInvalidate.add(new Block(block));
+          return storedBlock;
+        }
+
+        if(storedBlock.getGenerationStamp() != block.getGenerationStamp()
+            || storedBlock.getNumBytes() != block.getNumBytes())
+          isCorrupt = true;
+        break;
+      case UNDER_CONSTRUCTION:
+      case UNDER_RECOVERY:
+        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+            this, block, rState);
+      }
+      if(!isCorrupt && storedBlock.findDatanode(this) < 0)
+        if (storedBlock.getNumBytes() != block.getNumBytes()) {
+          toAdd.add(new Block(block));
+        } else {
+          toAdd.add(storedBlock);
+        }
+      break;
+    case RBW:
+    case RWR:
+      if(!storedBlock.isComplete())
+        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+                                                      this, block, rState);
+      else
+        isCorrupt = true;
+      break;
+    case RUR:       // should not be reported
+    case TEMPORARY: // should not be reported
+    default:
+      FSNamesystem.LOG.warn("Unexpected replica state " + rState
+          + " for block: " + storedBlock + 
+          " on " + getName() + " size " + storedBlock.getNumBytes());
+      break;
+    }
+    if(isCorrupt)
+        toCorrupt.add(storedBlock);
+    return storedBlock;
+  }
+
   /** Serialization for FSEditLog */
   void readFieldsFromFSEditLog(DataInput in) throws IOException {
     this.name = DeprecatedUTF8.readString(in);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Sep 30 23:39:30 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -191,7 +192,7 @@
    */
   INode unprotectedAddFile( String path, 
                             PermissionStatus permissions,
-                            Block[] blocks, 
+                            BlockInfo[] blocks, 
                             short replication,
                             long modificationTime,
                             long atime,
@@ -261,7 +262,8 @@
         // Add file->block mapping
         INodeFile newF = (INodeFile)newNode;
         for (int i = 0; i < nrBlocks; i++) {
-          newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
+          BlockInfo blockInfo = new BlockInfo(blocks[i], newF.getReplication());
+          newF.setBlock(i, getBlockManager().addINode(blockInfo, newF));
         }
       }
     }
@@ -271,27 +273,39 @@
   /**
    * Add a block to the file. Returns a reference to the added block.
    */
-  Block addBlock(String path, INode[] inodes, Block block
-      ) throws QuotaExceededException  {
+  BlockInfo addBlock(String path,
+                     INode[] inodes,
+                     Block block,
+                     DatanodeDescriptor targets[]
+  ) throws QuotaExceededException, IOException  {
     waitForReady();
 
     synchronized (rootDir) {
-      INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
+      assert inodes[inodes.length-1].isUnderConstruction() :
+        "INode should correspond to a file under construction";
+      INodeFileUnderConstruction fileINode = 
+        (INodeFileUnderConstruction)inodes[inodes.length-1];
 
       // check quota limits and updated space consumed
       updateCount(inodes, inodes.length-1, 0, 
-                  fileNode.getPreferredBlockSize()*fileNode.getReplication());
-      
-      // associate the new list of blocks with this file
-      BlockInfo blockInfo = getBlockManager().addINode(block, fileNode);
-      fileNode.addBlock(blockInfo);
+                  fileINode.getPreferredBlockSize()*fileINode.getReplication());
+
+      // associate new last block for the file
+      BlockInfoUnderConstruction blockInfo =
+        new BlockInfoUnderConstruction(
+            block,
+            fileINode.getReplication(),
+            BlockUCState.UNDER_CONSTRUCTION,
+            targets);
+      getBlockManager().addINode(blockInfo, fileINode);
+      fileINode.addBlock(blockInfo);
 
       NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                                     + path + " with " + block
                                     + " block is added to the in-memory "
                                     + "file system");
+      return blockInfo;
     }
-    return block;
   }
 
   /**
@@ -335,7 +349,7 @@
 
     synchronized (rootDir) {
       // modify file-> block and blocksMap
-      fileNode.removeBlock(block);
+      fileNode.removeLastBlock(block);
       getBlockManager().removeBlockFromMap(block);
       // If block is removed from blocksMap remove it from corruptReplicasMap
       getBlockManager().removeFromCorruptReplicasMap(block);
@@ -732,7 +746,7 @@
       }
       
       int index = 0;
-      for (Block b : newnode.getBlocks()) {
+      for (BlockInfo b : newnode.getBlocks()) {
         BlockInfo info = getBlockManager().addINode(b, newnode);
         newnode.setBlock(index, info); // inode refers to the block in BlocksMap
         index++;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Sep 30 23:39:30 2009
@@ -461,19 +461,9 @@
             blockSize = readLong(in);
           }
           // get blocks
-          Block blocks[] = null;
-          if (logVersion <= -14) {
-            blocks = readBlocks(in);
-          } else {
-            BlockTwo oldblk = new BlockTwo();
-            int num = in.readInt();
-            blocks = new Block[num];
-            for (int i = 0; i < num; i++) {
-              oldblk.readFields(in);
-              blocks[i] = new Block(oldblk.blkid, oldblk.len, 
-                                    Block.GRANDFATHER_GENERATION_STAMP);
-            }
-          }
+          boolean isFileUnderConstruction = (opcode == OP_ADD);
+          BlockInfo blocks[] = 
+            readBlocks(in, logVersion, isFileUnderConstruction, replication);
 
           // Older versions of HDFS does not store the block size in inode.
           // If the file has more than one block, use the size of the
@@ -521,7 +511,7 @@
                                                     path, permissions,
                                                     blocks, replication, 
                                                     mtime, atime, blockSize);
-          if (opcode == OP_ADD) {
+          if (isFileUnderConstruction) {
             numOpAdd++;
             //
             // Replace current node with a INodeUnderConstruction.
@@ -538,7 +528,7 @@
                                       clientMachine, 
                                       null);
             fsDir.replaceNode(path, node, cons);
-            fsNamesys.leaseManager.addLease(cons.clientName, path);
+            fsNamesys.leaseManager.addLease(cons.getClientName(), path);
           }
           break;
         } 
@@ -1247,12 +1237,26 @@
     return Long.parseLong(FSImage.readString(in));
   }
 
-  static private Block[] readBlocks(DataInputStream in) throws IOException {
+  static private BlockInfo[] readBlocks(
+      DataInputStream in,
+      int logVersion,
+      boolean isFileUnderConstruction,
+      short replication) throws IOException {
     int numBlocks = in.readInt();
-    Block[] blocks = new Block[numBlocks];
+    BlockInfo[] blocks = new BlockInfo[numBlocks];
+    Block blk = new Block();
+    BlockTwo oldblk = new BlockTwo();
     for (int i = 0; i < numBlocks; i++) {
-      blocks[i] = new Block();
-      blocks[i].readFields(in);
+      if (logVersion <= -14) {
+        blk.readFields(in);
+      } else {
+        oldblk.readFields(in);
+        blk.set(oldblk.blkid, oldblk.len, Block.GRANDFATHER_GENERATION_STAMP);
+      }
+      if(isFileUnderConstruction && i == numBlocks-1)
+        blocks[i] = new BlockInfoUnderConstruction(blk, replication);
+      else
+        blocks[i] = new BlockInfo(blk, replication);
     }
     return blocks;
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Sep 30 23:39:30 2009
@@ -55,6 +55,7 @@
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -1403,7 +1404,7 @@
       }
       INodeFile oldnode = (INodeFile) old;
       fsDir.replaceNode(path, oldnode, cons);
-      fs.leaseManager.addLease(cons.clientName, path); 
+      fs.leaseManager.addLease(cons.getClientName(), path); 
     }
   }
 
@@ -1419,10 +1420,17 @@
     int numBlocks = in.readInt();
     BlockInfo[] blocks = new BlockInfo[numBlocks];
     Block blk = new Block();
-    for (int i = 0; i < numBlocks; i++) {
+    int i = 0;
+    for (; i < numBlocks-1; i++) {
       blk.readFields(in);
       blocks[i] = new BlockInfo(blk, blockReplication);
     }
+    // last block is UNDER_CONSTRUCTION
+    if(numBlocks > 0) {
+      blk.readFields(in);
+      blocks[i] = new BlockInfoUnderConstruction(
+        blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
+    }
     PermissionStatus perm = PermissionStatus.read(in);
     String clientName = readString(in);
     String clientMachine = readString(in);
@@ -1430,7 +1438,7 @@
     // These locations are not used at all
     int numLocs = in.readInt();
     DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
-    for (int i = 0; i < numLocs; i++) {
+    for (i = 0; i < numLocs; i++) {
       locations[i] = new DatanodeDescriptor();
       locations[i].readFields(in);
     }



Mime
View raw message