hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [2/2] hadoop git commit: HDFS-8801. Convert BlockInfoUnderConstruction as a feature. Contributed by Jing Zhao.
Date Mon, 17 Aug 2015 18:29:00 GMT
HDFS-8801. Convert BlockInfoUnderConstruction as a feature. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e535e0f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e535e0f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e535e0f0

Branch: refs/heads/trunk
Commit: e535e0f05b5fbd087c93238deb888cc985254b4c
Parents: 456e901
Author: Haohui Mai <wheat9@apache.org>
Authored: Mon Aug 17 11:28:28 2015 -0700
Committer: Haohui Mai <wheat9@apache.org>
Committed: Mon Aug 17 11:28:28 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockCollection.java |   3 +-
 .../hdfs/server/blockmanagement/BlockInfo.java  | 134 ++++--
 .../BlockInfoContiguousUnderConstruction.java   | 403 -------------------
 .../server/blockmanagement/BlockManager.java    |  99 ++---
 .../BlockUnderConstructionFeature.java          | 335 +++++++++++++++
 .../blockmanagement/DatanodeDescriptor.java     |  11 +-
 .../server/blockmanagement/DatanodeManager.java |  19 +-
 .../hdfs/server/namenode/FSDirTruncateOp.java   |  39 +-
 .../hdfs/server/namenode/FSDirWriteFileOp.java  |  24 +-
 .../hdfs/server/namenode/FSEditLogLoader.java   |  25 +-
 .../hdfs/server/namenode/FSImageFormat.java     |   5 +-
 .../server/namenode/FSImageFormatPBINode.java   |   7 +-
 .../server/namenode/FSImageSerialization.java   |  10 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  50 +--
 .../namenode/FileUnderConstructionFeature.java  |   8 +-
 .../hadoop/hdfs/server/namenode/INodeFile.java  |  29 +-
 .../hadoop/hdfs/server/namenode/Namesystem.java |   4 +-
 .../server/namenode/snapshot/FileDiffList.java  |   3 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   9 +-
 .../TestBlockInfoUnderConstruction.java         |  18 +-
 .../blockmanagement/TestBlockManager.java       |   9 +-
 .../blockmanagement/TestHeartbeatHandling.java  |  22 +-
 .../blockmanagement/TestReplicationPolicy.java  |  13 +-
 .../namenode/TestBlockUnderConstruction.java    |   6 +-
 .../TestCommitBlockSynchronization.java         |   9 +-
 .../hdfs/server/namenode/TestFileTruncate.java  |   5 +-
 .../namenode/ha/TestRetryCacheWithHA.java       |  11 +-
 .../namenode/snapshot/SnapshotTestHelper.java   |   5 +-
 29 files changed, 642 insertions(+), 676 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c12e678..5866237 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -796,6 +796,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-6407. Add sorting and pagination in the datanode tab of the NN Web UI.
     (wheat9)
 
+    HDFS-8801. Convert BlockInfoUnderConstruction as a feature.
+    (Jing Zhao via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
index 02a1d05..a3b4401 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
@@ -79,8 +79,7 @@ public interface BlockCollection {
    * Convert the last block of the collection to an under-construction block
    * and set the locations.
    */
-  public BlockInfoContiguousUnderConstruction setLastBlock(BlockInfo lastBlock,
-      DatanodeStorageInfo[] targets) throws IOException;
+  public void convertLastBlockToUC(BlockInfo lastBlock, DatanodeStorageInfo[] targets) throws IOException;
 
   /**
    * @return whether the block collection is under construction.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index dea31c4..94dac35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -17,11 +17,16 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.io.IOException;
 import java.util.LinkedList;
+import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature.ReplicaUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.util.LightWeightGSet;
 
 /**
@@ -32,6 +37,7 @@ import org.apache.hadoop.util.LightWeightGSet;
 @InterfaceAudience.Private
 public abstract class  BlockInfo extends Block
     implements LightWeightGSet.LinkedElement {
+
   public static final BlockInfo[] EMPTY_ARRAY = {};
 
   private BlockCollection bc;
@@ -53,6 +59,8 @@ public abstract class  BlockInfo extends Block
    */
   protected Object[] triplets;
 
+  private BlockUnderConstructionFeature uc;
+
   /**
    * Construct an entry for blocksmap
    * @param replication the block's replication factor
@@ -181,7 +189,6 @@ public abstract class  BlockInfo extends Block
    */
   abstract boolean removeStorage(DatanodeStorageInfo storage);
 
-
   /**
    * Replace the current BlockInfo with the new one in corresponding
    * DatanodeStorageInfo's linked list
@@ -292,14 +299,36 @@ public abstract class  BlockInfo extends Block
     return this;
   }
 
-  /**
-   * BlockInfo represents a block that is not being constructed.
-   * In order to start modifying the block, the BlockInfo should be converted
-   * to {@link BlockInfoContiguousUnderConstruction}.
-   * @return {@link BlockUCState#COMPLETE}
-   */
+  @Override
+  public int hashCode() {
+    // Super implementation is sufficient
+    return super.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    // Sufficient to rely on super's implementation
+    return (this == obj) || super.equals(obj);
+  }
+
+  @Override
+  public LightWeightGSet.LinkedElement getNext() {
+    return nextLinkedElement;
+  }
+
+  @Override
+  public void setNext(LightWeightGSet.LinkedElement next) {
+    this.nextLinkedElement = next;
+  }
+
+  /* UnderConstruction Feature related */
+
+  public BlockUnderConstructionFeature getUnderConstructionFeature() {
+    return uc;
+  }
+
   public BlockUCState getBlockUCState() {
-    return BlockUCState.COMPLETE;
+    return uc == null ? BlockUCState.COMPLETE : uc.getBlockUCState();
   }
 
   /**
@@ -312,46 +341,69 @@ public abstract class  BlockInfo extends Block
   }
 
   /**
-   * Convert a complete block to an under construction block.
-   * @return BlockInfoUnderConstruction -  an under construction block.
+   * Add/Update the under construction feature.
    */
-  public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
-      BlockUCState s, DatanodeStorageInfo[] targets) {
-    if(isComplete()) {
-      BlockInfoContiguousUnderConstruction ucBlock =
-          new BlockInfoContiguousUnderConstruction(this,
-          getBlockCollection().getPreferredBlockReplication(), s, targets);
-      ucBlock.setBlockCollection(getBlockCollection());
-      return ucBlock;
+  public void convertToBlockUnderConstruction(BlockUCState s,
+      DatanodeStorageInfo[] targets) {
+    if (isComplete()) {
+      uc = new BlockUnderConstructionFeature(this, s, targets);
+    } else {
+      // the block is already under construction
+      uc.setBlockUCState(s);
+      uc.setExpectedLocations(this.getGenerationStamp(), targets);
     }
-    // the block is already under construction
-    BlockInfoContiguousUnderConstruction ucBlock =
-        (BlockInfoContiguousUnderConstruction)this;
-    ucBlock.setBlockUCState(s);
-    ucBlock.setExpectedLocations(targets);
-    ucBlock.setBlockCollection(getBlockCollection());
-    return ucBlock;
-  }
-
-  @Override
-  public int hashCode() {
-    // Super implementation is sufficient
-    return super.hashCode();
   }
 
-  @Override
-  public boolean equals(Object obj) {
-    // Sufficient to rely on super's implementation
-    return (this == obj) || super.equals(obj);
+  /**
+   * Convert an under construction block to a complete block.
+   *
+   * @return BlockInfo - a complete block.
+   * @throws IOException if the state of the block
+   * (the generation stamp and the length) has not been committed by
+   * the client or it does not have at least a minimal number of replicas
+   * reported from data-nodes.
+   */
+  BlockInfo convertToCompleteBlock() throws IOException {
+    assert getBlockUCState() != BlockUCState.COMPLETE :
+        "Trying to convert a COMPLETE block";
+    uc = null;
+    return this;
   }
 
-  @Override
-  public LightWeightGSet.LinkedElement getNext() {
-    return nextLinkedElement;
+  /**
+   * Process the recorded replicas. When about to commit or finish the
+   * pipeline recovery sort out bad replicas.
+   * @param genStamp  The final generation stamp for the block.
+   */
+  public void setGenerationStampAndVerifyReplicas(long genStamp) {
+    Preconditions.checkState(uc != null && !isComplete());
+    // Set the generation stamp for the block.
+    setGenerationStamp(genStamp);
+
+    // Remove the replicas with wrong gen stamp
+    List<ReplicaUnderConstruction> staleReplicas = uc.getStaleReplicas(genStamp);
+    for (ReplicaUnderConstruction r : staleReplicas) {
+      r.getExpectedStorageLocation().removeBlock(this);
+      NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica "
+          + "from location: {}", r.getExpectedStorageLocation());
+    }
   }
 
-  @Override
-  public void setNext(LightWeightGSet.LinkedElement next) {
-    this.nextLinkedElement = next;
+  /**
+   * Commit block's length and generation stamp as reported by the client.
+   * Set block state to {@link BlockUCState#COMMITTED}.
+   * @param block - contains client reported block length and generation
+   * @throws IOException if block ids are inconsistent.
+   */
+  void commitBlock(Block block) throws IOException {
+    if (getBlockId() != block.getBlockId()) {
+      throw new IOException("Trying to commit inconsistent block: id = "
+          + block.getBlockId() + ", expected id = " + getBlockId());
+    }
+    Preconditions.checkState(!isComplete());
+    uc.commit();
+    this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+    // Sort out invalid replicas.
+    setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
deleted file mode 100644
index 7ca6419..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-
-/**
- * Represents a block that is currently being constructed.<br>
- * This is usually the last block of a file opened for write or append.
- */
-public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
-  /** Block state. See {@link BlockUCState} */
-  private BlockUCState blockUCState;
-
-  /**
-   * Block replicas as assigned when the block was allocated.
-   * This defines the pipeline order.
-   */
-  private List<ReplicaUnderConstruction> replicas;
-
-  /**
-   * Index of the primary data node doing the recovery. Useful for log
-   * messages.
-   */
-  private int primaryNodeIndex = -1;
-
-  /**
-   * The new generation stamp, which this block will have
-   * after the recovery succeeds. Also used as a recovery id to identify
-   * the right recovery if any of the abandoned recoveries re-appear.
-   */
-  private long blockRecoveryId = 0;
-
-  /**
-   * The block source to use in the event of copy-on-write truncate.
-   */
-  private Block truncateBlock;
-
-  /**
-   * ReplicaUnderConstruction contains information about replicas while
-   * they are under construction.
-   * The GS, the length and the state of the replica is as reported by 
-   * the data-node.
-   * It is not guaranteed, but expected, that data-nodes actually have
-   * corresponding replicas.
-   */
-  static class ReplicaUnderConstruction extends Block {
-    private final DatanodeStorageInfo expectedLocation;
-    private ReplicaState state;
-    private boolean chosenAsPrimary;
-
-    ReplicaUnderConstruction(Block block,
-                             DatanodeStorageInfo target,
-                             ReplicaState state) {
-      super(block);
-      this.expectedLocation = target;
-      this.state = state;
-      this.chosenAsPrimary = false;
-    }
-
-    /**
-     * Expected block replica location as assigned when the block was allocated.
-     * This defines the pipeline order.
-     * It is not guaranteed, but expected, that the data-node actually has
-     * the replica.
-     */
-    private DatanodeStorageInfo getExpectedStorageLocation() {
-      return expectedLocation;
-    }
-
-    /**
-     * Get replica state as reported by the data-node.
-     */
-    ReplicaState getState() {
-      return state;
-    }
-
-    /**
-     * Whether the replica was chosen for recovery.
-     */
-    boolean getChosenAsPrimary() {
-      return chosenAsPrimary;
-    }
-
-    /**
-     * Set replica state.
-     */
-    void setState(ReplicaState s) {
-      state = s;
-    }
-
-    /**
-     * Set whether this replica was chosen for recovery.
-     */
-    void setChosenAsPrimary(boolean chosenAsPrimary) {
-      this.chosenAsPrimary = chosenAsPrimary;
-    }
-
-    /**
-     * Is data-node the replica belongs to alive.
-     */
-    boolean isAlive() {
-      return expectedLocation.getDatanodeDescriptor().isAlive;
-    }
-
-    @Override // Block
-    public int hashCode() {
-      return super.hashCode();
-    }
-
-    @Override // Block
-    public boolean equals(Object obj) {
-      // Sufficient to rely on super's implementation
-      return (this == obj) || super.equals(obj);
-    }
-
-    @Override
-    public String toString() {
-      final StringBuilder b = new StringBuilder(50);
-      appendStringTo(b);
-      return b.toString();
-    }
-    
-    @Override
-    public void appendStringTo(StringBuilder sb) {
-      sb.append("ReplicaUC[")
-        .append(expectedLocation)
-        .append("|")
-        .append(state)
-        .append("]");
-    }
-  }
-
-  /**
-   * Create block and set its state to
-   * {@link BlockUCState#UNDER_CONSTRUCTION}.
-   */
-  public BlockInfoContiguousUnderConstruction(Block blk, short replication) {
-    this(blk, replication, BlockUCState.UNDER_CONSTRUCTION, null);
-  }
-
-  /**
-   * Create a block that is currently being constructed.
-   */
-  public BlockInfoContiguousUnderConstruction(Block blk, short replication,
-      BlockUCState state, DatanodeStorageInfo[] targets) {
-    super(blk, replication);
-    assert getBlockUCState() != BlockUCState.COMPLETE :
-      "BlockInfoUnderConstruction cannot be in COMPLETE state";
-    this.blockUCState = state;
-    setExpectedLocations(targets);
-  }
-
-  /**
-   * Convert an under construction block to a complete block.
-   * 
-   * @return BlockInfo - a complete block.
-   * @throws IOException if the state of the block 
-   * (the generation stamp and the length) has not been committed by 
-   * the client or it does not have at least a minimal number of replicas 
-   * reported from data-nodes. 
-   */
-  BlockInfo convertToCompleteBlock() throws IOException {
-    assert getBlockUCState() != BlockUCState.COMPLETE :
-      "Trying to convert a COMPLETE block";
-    return new BlockInfoContiguous(this);
-  }
-
-  /** Set expected locations */
-  public void setExpectedLocations(DatanodeStorageInfo[] targets) {
-    int numLocations = targets == null ? 0 : targets.length;
-    this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
-    for(int i = 0; i < numLocations; i++)
-      replicas.add(
-        new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
-  }
-
-  /**
-   * Create array of expected replica locations
-   * (as has been assigned by chooseTargets()).
-   */
-  public DatanodeStorageInfo[] getExpectedStorageLocations() {
-    int numLocations = replicas == null ? 0 : replicas.size();
-    DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
-    for(int i = 0; i < numLocations; i++)
-      storages[i] = replicas.get(i).getExpectedStorageLocation();
-    return storages;
-  }
-
-  /** Get the number of expected locations */
-  public int getNumExpectedLocations() {
-    return replicas == null ? 0 : replicas.size();
-  }
-
-  /**
-   * Return the state of the block under construction.
-   * @see BlockUCState
-   */
-  @Override // BlockInfo
-  public BlockUCState getBlockUCState() {
-    return blockUCState;
-  }
-
-  void setBlockUCState(BlockUCState s) {
-    blockUCState = s;
-  }
-
-  /** Get block recovery ID */
-  public long getBlockRecoveryId() {
-    return blockRecoveryId;
-  }
-
-  /** Get recover block */
-  public Block getTruncateBlock() {
-    return truncateBlock;
-  }
-
-  public void setTruncateBlock(Block recoveryBlock) {
-    this.truncateBlock = recoveryBlock;
-  }
-
-  /**
-   * Process the recorded replicas. When about to commit or finish the
-   * pipeline recovery sort out bad replicas.
-   * @param genStamp  The final generation stamp for the block.
-   */
-  public void setGenerationStampAndVerifyReplicas(long genStamp) {
-    // Set the generation stamp for the block.
-    setGenerationStamp(genStamp);
-    if (replicas == null)
-      return;
-
-    // Remove the replicas with wrong gen stamp.
-    // The replica list is unchanged.
-    for (ReplicaUnderConstruction r : replicas) {
-      if (genStamp != r.getGenerationStamp()) {
-        r.getExpectedStorageLocation().removeBlock(this);
-        NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica "
-            + "from location: {}", r.getExpectedStorageLocation());
-      }
-    }
-  }
-
-  /**
-   * Commit block's length and generation stamp as reported by the client.
-   * Set block state to {@link BlockUCState#COMMITTED}.
-   * @param block - contains client reported block length and generation 
-   * @throws IOException if block ids are inconsistent.
-   */
-  void commitBlock(Block block) throws IOException {
-    if(getBlockId() != block.getBlockId())
-      throw new IOException("Trying to commit inconsistent block: id = "
-          + block.getBlockId() + ", expected id = " + getBlockId());
-    blockUCState = BlockUCState.COMMITTED;
-    this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
-    // Sort out invalid replicas.
-    setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
-  }
-
-  /**
-   * Initialize lease recovery for this block.
-   * Find the first alive data-node starting from the previous primary and
-   * make it primary.
-   */
-  public void initializeBlockRecovery(long recoveryId) {
-    setBlockUCState(BlockUCState.UNDER_RECOVERY);
-    blockRecoveryId = recoveryId;
-    if (replicas.size() == 0) {
-      NameNode.blockStateChangeLog.warn("BLOCK*"
-        + " BlockInfoUnderConstruction.initLeaseRecovery:"
-        + " No blocks found, lease removed.");
-    }
-    boolean allLiveReplicasTriedAsPrimary = true;
-    for (int i = 0; i < replicas.size(); i++) {
-      // Check if all replicas have been tried or not.
-      if (replicas.get(i).isAlive()) {
-        allLiveReplicasTriedAsPrimary =
-            (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
-      }
-    }
-    if (allLiveReplicasTriedAsPrimary) {
-      // Just set all the replicas to be chosen whether they are alive or not.
-      for (int i = 0; i < replicas.size(); i++) {
-        replicas.get(i).setChosenAsPrimary(false);
-      }
-    }
-    long mostRecentLastUpdate = 0;
-    ReplicaUnderConstruction primary = null;
-    primaryNodeIndex = -1;
-    for(int i = 0; i < replicas.size(); i++) {
-      // Skip alive replicas which have been chosen for recovery.
-      if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
-        continue;
-      }
-      final ReplicaUnderConstruction ruc = replicas.get(i);
-      final long lastUpdate = ruc.getExpectedStorageLocation()
-          .getDatanodeDescriptor().getLastUpdateMonotonic();
-      if (lastUpdate > mostRecentLastUpdate) {
-        primaryNodeIndex = i;
-        primary = ruc;
-        mostRecentLastUpdate = lastUpdate;
-      }
-    }
-    if (primary != null) {
-      primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
-      primary.setChosenAsPrimary(true);
-      NameNode.blockStateChangeLog.debug(
-          "BLOCK* {} recovery started, primary={}", this, primary);
-    }
-  }
-
-  void addReplicaIfNotPresent(DatanodeStorageInfo storage,
-                     Block block,
-                     ReplicaState rState) {
-    Iterator<ReplicaUnderConstruction> it = replicas.iterator();
-    while (it.hasNext()) {
-      ReplicaUnderConstruction r = it.next();
-      DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation();
-      if(expectedLocation == storage) {
-        // Record the gen stamp from the report
-        r.setGenerationStamp(block.getGenerationStamp());
-        return;
-      } else if (expectedLocation != null &&
-                 expectedLocation.getDatanodeDescriptor() ==
-                     storage.getDatanodeDescriptor()) {
-
-        // The Datanode reported that the block is on a different storage
-        // than the one chosen by BlockPlacementPolicy. This can occur as
-        // we allow Datanodes to choose the target storage. Update our
-        // state by removing the stale entry and adding a new one.
-        it.remove();
-        break;
-      }
-    }
-    replicas.add(new ReplicaUnderConstruction(block, storage, rState));
-  }
-
-  @Override // BlockInfo
-  // BlockInfoUnderConstruction participates in maps the same way as BlockInfo
-  public int hashCode() {
-    return super.hashCode();
-  }
-
-  @Override // BlockInfo
-  public boolean equals(Object obj) {
-    // Sufficient to rely on super's implementation
-    return (this == obj) || super.equals(obj);
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder b = new StringBuilder(100);
-    appendStringTo(b);
-    return b.toString();
-  }
-
-  @Override
-  public void appendStringTo(StringBuilder sb) {
-    super.appendStringTo(sb);
-    appendUCParts(sb);
-  }
-
-  private void appendUCParts(StringBuilder sb) {
-    sb.append("{UCState=").append(blockUCState)
-      .append(", truncateBlock=" + truncateBlock)
-      .append(", primaryNodeIndex=").append(primaryNodeIndex)
-      .append(", replicas=[");
-    if (replicas != null) {
-      Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
-      if (iter.hasNext()) {
-        iter.next().appendStringTo(sb);
-        while (iter.hasNext()) {
-          sb.append(", ");
-          iter.next().appendStringTo(sb);
-        }
-      }
-    }
-    sb.append("]}");
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 508da85..fa51491 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.IOException;
@@ -617,9 +616,8 @@ public class BlockManager implements BlockStatsMXBean {
    * @throws IOException if the block does not have at least a minimal number
    * of replicas reported from data-nodes.
    */
-  private static boolean commitBlock(
-      final BlockInfoContiguousUnderConstruction block, final Block commitBlock)
-      throws IOException {
+  private static boolean commitBlock(final BlockInfo block,
+      final Block commitBlock) throws IOException {
     if (block.getBlockUCState() == BlockUCState.COMMITTED)
       return false;
     assert block.getNumBytes() <= commitBlock.getNumBytes() :
@@ -649,8 +647,7 @@ public class BlockManager implements BlockStatsMXBean {
     if(lastBlock.isComplete())
       return false; // already completed (e.g. by syncBlock)
     
-    final boolean b = commitBlock(
-        (BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
+    final boolean b = commitBlock(lastBlock, commitBlock);
     if(countNodes(lastBlock).liveReplicas() >= minReplication)
       completeBlock(bc, bc.numBlocks()-1, false);
     return b;
@@ -670,16 +667,15 @@ public class BlockManager implements BlockStatsMXBean {
     BlockInfo curBlock = bc.getBlocks()[blkIndex];
     if(curBlock.isComplete())
       return curBlock;
-    BlockInfoContiguousUnderConstruction ucBlock =
-        (BlockInfoContiguousUnderConstruction) curBlock;
-    int numNodes = ucBlock.numNodes();
+
+    int numNodes = curBlock.numNodes();
     if (!force && numNodes < minReplication)
       throw new IOException("Cannot complete block: " +
           "block does not satisfy minimal replication requirement.");
-    if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
+    if(!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED)
       throw new IOException(
           "Cannot complete block: block has not been COMMITTED by the client");
-    BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
+    BlockInfo completeBlock = curBlock.convertToCompleteBlock();
     // replace penultimate block in file
     bc.setBlock(blkIndex, completeBlock);
     
@@ -713,7 +709,7 @@ public class BlockManager implements BlockStatsMXBean {
    * when tailing edit logs as a Standby.
    */
   public BlockInfo forceCompleteBlock(final BlockCollection bc,
-      final BlockInfoContiguousUnderConstruction block) throws IOException {
+      final BlockInfo block) throws IOException {
     block.commitBlock(block);
     return completeBlock(bc, block, true);
   }
@@ -735,28 +731,29 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public LocatedBlock convertLastBlockToUnderConstruction(
       BlockCollection bc, long bytesToRemove) throws IOException {
-    BlockInfo oldBlock = bc.getLastBlock();
-    if(oldBlock == null ||
-       bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove)
+    BlockInfo lastBlock = bc.getLastBlock();
+    if (lastBlock == null ||
+       bc.getPreferredBlockSize() == lastBlock.getNumBytes() - bytesToRemove) {
       return null;
-    assert oldBlock == getStoredBlock(oldBlock) :
+    }
+    assert lastBlock == getStoredBlock(lastBlock) :
       "last block of the file is not in blocksMap";
 
-    DatanodeStorageInfo[] targets = getStorages(oldBlock);
+    DatanodeStorageInfo[] targets = getStorages(lastBlock);
 
-    BlockInfoContiguousUnderConstruction ucBlock =
-        bc.setLastBlock(oldBlock, targets);
-    blocksMap.replaceBlock(ucBlock);
+    // convert the last block to under construction. note no block replacement
+    // is happening
+    bc.convertLastBlockToUC(lastBlock, targets);
 
     // Remove block from replication queue.
-    NumberReplicas replicas = countNodes(ucBlock);
-    neededReplications.remove(ucBlock, replicas.liveReplicas(),
-        replicas.decommissionedAndDecommissioning(), getReplication(ucBlock));
-    pendingReplications.remove(ucBlock);
+    NumberReplicas replicas = countNodes(lastBlock);
+    neededReplications.remove(lastBlock, replicas.liveReplicas(),
+        replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
+    pendingReplications.remove(lastBlock);
 
     // remove this block from the list of pending blocks to be deleted. 
     for (DatanodeStorageInfo storage : targets) {
-      invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock);
+      invalidateBlocks.remove(storage.getDatanodeDescriptor(), lastBlock);
     }
     
     // Adjust safe-mode totals, since under-construction blocks don't
@@ -767,9 +764,11 @@ public class BlockManager implements BlockStatsMXBean {
         // always decrement total blocks
         -1);
 
-    final long fileLength = bc.computeContentSummary(getStoragePolicySuite()).getLength();
-    final long pos = fileLength - ucBlock.getNumBytes();
-    return createLocatedBlock(ucBlock, pos, BlockTokenIdentifier.AccessMode.WRITE);
+    final long fileLength = bc.computeContentSummary(
+        getStoragePolicySuite()).getLength();
+    final long pos = fileLength - lastBlock.getNumBytes();
+    return createLocatedBlock(lastBlock, pos,
+        BlockTokenIdentifier.AccessMode.WRITE);
   }
 
   /**
@@ -846,15 +845,9 @@ public class BlockManager implements BlockStatsMXBean {
   /** @return a LocatedBlock for the given block */
   private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
       ) throws IOException {
-    if (blk instanceof BlockInfoContiguousUnderConstruction) {
-      if (blk.isComplete()) {
-        throw new IOException(
-            "blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
-            + ", blk=" + blk);
-      }
-      final BlockInfoContiguousUnderConstruction uc =
-          (BlockInfoContiguousUnderConstruction) blk;
-      final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
+    if (!blk.isComplete()) {
+      final DatanodeStorageInfo[] storages = blk.getUnderConstructionFeature()
+          .getExpectedStorageLocations();
       final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
       return newLocatedBlock(eb, storages, pos, false);
     }
@@ -1761,12 +1754,13 @@ public class BlockManager implements BlockStatsMXBean {
    * reported by the datanode in the block report. 
    */
   static class StatefulBlockInfo {
-    final BlockInfoContiguousUnderConstruction storedBlock;
+    final BlockInfo storedBlock;
     final Block reportedBlock;
     final ReplicaState reportedState;
     
-    StatefulBlockInfo(BlockInfoContiguousUnderConstruction storedBlock,
+    StatefulBlockInfo(BlockInfo storedBlock,
         Block reportedBlock, ReplicaState reportedState) {
+      Preconditions.checkArgument(!storedBlock.isComplete());
       this.storedBlock = storedBlock;
       this.reportedBlock = reportedBlock;
       this.reportedState = reportedState;
@@ -2165,15 +2159,14 @@ public class BlockManager implements BlockStatsMXBean {
       
       // If block is under construction, add this replica to its list
       if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-        ((BlockInfoContiguousUnderConstruction)storedBlock)
+        storedBlock.getUnderConstructionFeature()
             .addReplicaIfNotPresent(storageInfo, iblk, reportedState);
         // OpenFileBlocks only inside snapshots also will be added to safemode
         // threshold. So we need to update such blocks to safemode
         // refer HDFS-5283
-        BlockInfoContiguousUnderConstruction blockUC =
-            (BlockInfoContiguousUnderConstruction) storedBlock;
-        if (namesystem.isInSnapshot(blockUC)) {
-          int numOfReplicas = blockUC.getNumExpectedLocations();
+        if (namesystem.isInSnapshot(storedBlock)) {
+          int numOfReplicas = storedBlock.getUnderConstructionFeature()
+              .getNumExpectedLocations();
           namesystem.incrementSafeBlockCount(numOfReplicas);
         }
         //and fall through to next clause
@@ -2298,11 +2291,6 @@ public class BlockManager implements BlockStatsMXBean {
 
     // Ignore replicas already scheduled to be removed from the DN
     if(invalidateBlocks.contains(dn, block)) {
-      /*
-       * TODO: following assertion is incorrect, see HDFS-2668 assert
-       * storedBlock.findDatanode(dn) < 0 : "Block " + block +
-       * " in recentInvalidatesSet should not appear in DN " + dn;
-       */
       return storedBlock;
     }
 
@@ -2325,8 +2313,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-      toUC.add(new StatefulBlockInfo(
-          (BlockInfoContiguousUnderConstruction) storedBlock,
+      toUC.add(new StatefulBlockInfo(storedBlock,
           new Block(block), reportedState));
       return storedBlock;
     }
@@ -2517,8 +2504,8 @@ public class BlockManager implements BlockStatsMXBean {
 
   void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
       DatanodeStorageInfo storageInfo) throws IOException {
-    BlockInfoContiguousUnderConstruction block = ucBlock.storedBlock;
-    block.addReplicaIfNotPresent(
+    BlockInfo block = ucBlock.storedBlock;
+    block.getUnderConstructionFeature().addReplicaIfNotPresent(
         storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
 
     if (ucBlock.reportedState == ReplicaState.FINALIZED &&
@@ -2578,7 +2565,7 @@ public class BlockManager implements BlockStatsMXBean {
     assert block != null && namesystem.hasWriteLock();
     BlockInfo storedBlock;
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
-    if (block instanceof BlockInfoContiguousUnderConstruction) {
+    if (!block.isComplete()) {
       //refresh our copy in case the block got completed in another thread
       storedBlock = blocksMap.getStoredBlock(block);
     } else {
@@ -3533,11 +3520,9 @@ public class BlockManager implements BlockStatsMXBean {
       String src, BlockInfo[] blocks) {
     for (BlockInfo b: blocks) {
       if (!b.isComplete()) {
-        final BlockInfoContiguousUnderConstruction uc =
-            (BlockInfoContiguousUnderConstruction)b;
         final int numNodes = b.numNodes();
         LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
-          + uc.getBlockUCState() + ", replication# = " + numNodes
+          + b.getBlockUCState() + ", replication# = " + numNodes
           + (numNodes < minReplication ? " < ": " >= ")
           + " minimum = " + minReplication + ") in file " + src);
         return false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
new file mode 100644
index 0000000..a555cd6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+/**
+ * Represents a block that is currently being constructed.<br>
+ * This is usually the last block of a file opened for write or append.
+ */
+public class BlockUnderConstructionFeature {
+  /** Block state. See {@link BlockUCState} */
+  private BlockUCState blockUCState;
+
+  /**
+   * Block replicas as assigned when the block was allocated.
+   * This defines the pipeline order.
+   */
+  private List<ReplicaUnderConstruction> replicas;
+
+  /**
+   * Index of the primary data node doing the recovery. Useful for log
+   * messages.
+   */
+  private int primaryNodeIndex = -1;
+
+  /**
+   * The new generation stamp, which this block will have
+   * after the recovery succeeds. Also used as a recovery id to identify
+   * the right recovery if any of the abandoned recoveries re-appear.
+   */
+  private long blockRecoveryId = 0;
+
+  /**
+   * The block source to use in the event of copy-on-write truncate.
+   */
+  private Block truncateBlock;
+
+  /**
+   * ReplicaUnderConstruction contains information about replicas while
+   * they are under construction.
+   * The GS, the length and the state of the replica is as reported by
+   * the data-node.
+   * It is not guaranteed, but expected, that data-nodes actually have
+   * corresponding replicas.
+   */
+  static class ReplicaUnderConstruction {
+    private long generationStamp;
+    private final DatanodeStorageInfo expectedLocation;
+    private ReplicaState state;
+    private boolean chosenAsPrimary;
+
+    ReplicaUnderConstruction(long generationStamp, DatanodeStorageInfo target,
+        ReplicaState state) {
+      this.generationStamp = generationStamp;
+      this.expectedLocation = target;
+      this.state = state;
+      this.chosenAsPrimary = false;
+    }
+
+    long getGenerationStamp() {
+      return this.generationStamp;
+    }
+
+    void setGenerationStamp(long generationStamp) {
+      this.generationStamp = generationStamp;
+    }
+
+    /**
+     * Expected block replica location as assigned when the block was allocated.
+     * This defines the pipeline order.
+     * It is not guaranteed, but expected, that the data-node actually has
+     * the replica.
+     */
+    DatanodeStorageInfo getExpectedStorageLocation() {
+      return expectedLocation;
+    }
+
+    /**
+     * Get replica state as reported by the data-node.
+     */
+    ReplicaState getState() {
+      return state;
+    }
+
+    /**
+     * Whether the replica was chosen for recovery.
+     */
+    boolean getChosenAsPrimary() {
+      return chosenAsPrimary;
+    }
+
+    /**
+     * Set replica state.
+     */
+    void setState(ReplicaState s) {
+      state = s;
+    }
+
+    /**
+     * Set whether this replica was chosen for recovery.
+     */
+    void setChosenAsPrimary(boolean chosenAsPrimary) {
+      this.chosenAsPrimary = chosenAsPrimary;
+    }
+
+    /**
+     * Is data-node the replica belongs to alive.
+     */
+    boolean isAlive() {
+      return expectedLocation.getDatanodeDescriptor().isAlive;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder b = new StringBuilder(50)
+          .append("ReplicaUC[")
+          .append(expectedLocation)
+          .append("|")
+          .append(state)
+          .append("]");
+      return b.toString();
+    }
+  }
+
+  /**
+   * Create a block that is currently being constructed.
+   */
+  public BlockUnderConstructionFeature(Block block, BlockUCState state,
+      DatanodeStorageInfo[] targets) {
+    assert getBlockUCState() != BlockUCState.COMPLETE :
+      "BlockInfoUnderConstruction cannot be in COMPLETE state";
+    this.blockUCState = state;
+    setExpectedLocations(block.getGenerationStamp(), targets);
+  }
+
+  /** Set expected locations */
+  public void setExpectedLocations(long generationStamp,
+      DatanodeStorageInfo[] targets) {
+    int numLocations = targets == null ? 0 : targets.length;
+    this.replicas = new ArrayList<>(numLocations);
+    for(int i = 0; i < numLocations; i++) {
+      replicas.add(new ReplicaUnderConstruction(generationStamp, targets[i],
+          ReplicaState.RBW));
+    }
+  }
+
+  /**
+   * Create array of expected replica locations
+   * (as has been assigned by chooseTargets()).
+   */
+  public DatanodeStorageInfo[] getExpectedStorageLocations() {
+    int numLocations = replicas == null ? 0 : replicas.size();
+    DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
+    for (int i = 0; i < numLocations; i++) {
+      storages[i] = replicas.get(i).getExpectedStorageLocation();
+    }
+    return storages;
+  }
+
+  /** Get the number of expected locations */
+  public int getNumExpectedLocations() {
+    return replicas == null ? 0 : replicas.size();
+  }
+
+  /**
+   * Return the state of the block under construction.
+   * @see BlockUCState
+   */
+  public BlockUCState getBlockUCState() {
+    return blockUCState;
+  }
+
+  void setBlockUCState(BlockUCState s) {
+    blockUCState = s;
+  }
+
+  /** Get block recovery ID */
+  public long getBlockRecoveryId() {
+    return blockRecoveryId;
+  }
+
+  /** Get recover block */
+  public Block getTruncateBlock() {
+    return truncateBlock;
+  }
+
+  public void setTruncateBlock(Block recoveryBlock) {
+    this.truncateBlock = recoveryBlock;
+  }
+
+  /**
+   * Set {@link #blockUCState} to {@link BlockUCState#COMMITTED}.
+   */
+  void commit() {
+    blockUCState = BlockUCState.COMMITTED;
+  }
+
+  List<ReplicaUnderConstruction> getStaleReplicas(long genStamp) {
+    List<ReplicaUnderConstruction> staleReplicas = new ArrayList<>();
+    if (replicas != null) {
+      // Remove replicas with wrong gen stamp. The replica list is unchanged.
+      for (ReplicaUnderConstruction r : replicas) {
+        if (genStamp != r.getGenerationStamp()) {
+          staleReplicas.add(r);
+        }
+      }
+    }
+    return staleReplicas;
+  }
+
+  /**
+   * Initialize lease recovery for this block.
+   * Find the first alive data-node starting from the previous primary and
+   * make it primary.
+   */
+  public void initializeBlockRecovery(BlockInfo block, long recoveryId) {
+    setBlockUCState(BlockUCState.UNDER_RECOVERY);
+    blockRecoveryId = recoveryId;
+    if (replicas.size() == 0) {
+      NameNode.blockStateChangeLog.warn("BLOCK*"
+        + " BlockInfoUnderConstruction.initLeaseRecovery:"
+        + " No blocks found, lease removed.");
+    }
+    boolean allLiveReplicasTriedAsPrimary = true;
+    for (ReplicaUnderConstruction replica : replicas) {
+      // Check if all replicas have been tried or not.
+      if (replica.isAlive()) {
+        allLiveReplicasTriedAsPrimary = allLiveReplicasTriedAsPrimary
+            && replica.getChosenAsPrimary();
+      }
+    }
+    if (allLiveReplicasTriedAsPrimary) {
+      // Just set all the replicas to be chosen whether they are alive or not.
+      for (ReplicaUnderConstruction replica : replicas) {
+        replica.setChosenAsPrimary(false);
+      }
+    }
+    long mostRecentLastUpdate = 0;
+    ReplicaUnderConstruction primary = null;
+    primaryNodeIndex = -1;
+    for(int i = 0; i < replicas.size(); i++) {
+      // Skip alive replicas which have been chosen for recovery.
+      if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
+        continue;
+      }
+      final ReplicaUnderConstruction ruc = replicas.get(i);
+      final long lastUpdate = ruc.getExpectedStorageLocation()
+          .getDatanodeDescriptor().getLastUpdateMonotonic();
+      if (lastUpdate > mostRecentLastUpdate) {
+        primaryNodeIndex = i;
+        primary = ruc;
+        mostRecentLastUpdate = lastUpdate;
+      }
+    }
+    if (primary != null) {
+      primary.getExpectedStorageLocation().getDatanodeDescriptor()
+          .addBlockToBeRecovered(block);
+      primary.setChosenAsPrimary(true);
+      NameNode.blockStateChangeLog.debug(
+          "BLOCK* {} recovery started, primary={}", this, primary);
+    }
+  }
+
+  void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block block,
+      ReplicaState rState) {
+    Iterator<ReplicaUnderConstruction> it = replicas.iterator();
+    while (it.hasNext()) {
+      ReplicaUnderConstruction r = it.next();
+      DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation();
+      if (expectedLocation == storage) {
+        // Record the gen stamp from the report
+        r.setGenerationStamp(block.getGenerationStamp());
+        return;
+      } else if (expectedLocation != null &&
+                 expectedLocation.getDatanodeDescriptor() ==
+                     storage.getDatanodeDescriptor()) {
+        // The Datanode reported that the block is on a different storage
+        // than the one chosen by BlockPlacementPolicy. This can occur as
+        // we allow Datanodes to choose the target storage. Update our
+        // state by removing the stale entry and adding a new one.
+        it.remove();
+        break;
+      }
+    }
+    replicas.add(new ReplicaUnderConstruction(block.getGenerationStamp(), storage, rState));
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(100);
+    appendUCParts(b);
+    return b.toString();
+  }
+
+  private void appendUCParts(StringBuilder sb) {
+    sb.append("{UCState=").append(blockUCState)
+      .append(", truncateBlock=").append(truncateBlock)
+      .append(", primaryNodeIndex=").append(primaryNodeIndex)
+      .append(", replicas=[");
+    if (replicas != null) {
+      Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
+      if (iter.hasNext()) {
+        sb.append(iter.next());
+        while (iter.hasNext()) {
+          sb.append(", ");
+          sb.append(iter.next());
+        }
+      }
+    }
+    sb.append("]}");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 87ce753..9334b5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -222,8 +222,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   private final BlockQueue<BlockTargetPair> replicateBlocks =
       new BlockQueue<>();
   /** A queue of blocks to be recovered by this datanode */
-  private final BlockQueue<BlockInfoContiguousUnderConstruction> recoverBlocks =
-                                new BlockQueue<BlockInfoContiguousUnderConstruction>();
+  private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>();
   /** A set of blocks to be invalidated by this datanode */
   private final LightWeightHashSet<Block> invalidateBlocks =
       new LightWeightHashSet<>();
@@ -602,7 +601,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /**
    * Store block recovery work.
    */
-  void addBlockToBeRecovered(BlockInfoContiguousUnderConstruction block) {
+  void addBlockToBeRecovered(BlockInfo block) {
     if(recoverBlocks.contains(block)) {
       // this prevents adding the same block twice to the recovery queue
       BlockManager.LOG.info(block + " is already in the recovery queue");
@@ -644,11 +643,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return replicateBlocks.poll(maxTransfers);
   }
 
-  public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
-    List<BlockInfoContiguousUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
+  public BlockInfo[] getLeaseRecoveryCommand(int maxTransfers) {
+    List<BlockInfo> blocks = recoverBlocks.poll(maxTransfers);
     if(blocks == null)
       return null;
-    return blocks.toArray(new BlockInfoContiguousUnderConstruction[blocks.size()]);
+    return blocks.toArray(new BlockInfo[blocks.size()]);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 3397bbb..d1900f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1380,13 +1380,14 @@ public class DatanodeManager {
         }
 
         //check lease recovery
-        BlockInfoContiguousUnderConstruction[] blocks = nodeinfo
-            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
         if (blocks != null) {
           BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
               blocks.length);
-          for (BlockInfoContiguousUnderConstruction b : blocks) {
-            final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
+          for (BlockInfo b : blocks) {
+            BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
+            assert uc != null;
+            final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
             // Skip stale nodes during recovery - not heart beated for some time (30s by default).
             final List<DatanodeStorageInfo> recoveryLocations =
                 new ArrayList<>(storages.length);
@@ -1397,11 +1398,11 @@ public class DatanodeManager {
             }
             // If we are performing a truncate recovery than set recovery fields
             // to old block.
-            boolean truncateRecovery = b.getTruncateBlock() != null;
+            boolean truncateRecovery = uc.getTruncateBlock() != null;
             boolean copyOnTruncateRecovery = truncateRecovery &&
-                b.getTruncateBlock().getBlockId() != b.getBlockId();
+                uc.getTruncateBlock().getBlockId() != b.getBlockId();
             ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
-                new ExtendedBlock(blockPoolId, b.getTruncateBlock()) :
+                new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
                 new ExtendedBlock(blockPoolId, b);
             // If we only get 1 replica after eliminating stale nodes, then choose all
             // replicas for recovery and let the primary data node handle failures.
@@ -1420,12 +1421,12 @@ public class DatanodeManager {
             }
             if(truncateRecovery) {
               Block recoveryBlock = (copyOnTruncateRecovery) ? b :
-                  b.getTruncateBlock();
+                  uc.getTruncateBlock();
               brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
                                                 recoveryBlock));
             } else {
               brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
-                                                b.getBlockRecoveryId()));
+                                                uc.getBlockRecoveryId()));
             }
           }
           return new DatanodeCommand[] { brCommand };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
index 474c257..6d37530 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
@@ -28,8 +28,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@@ -95,7 +96,7 @@ final class FSDirTruncateOp {
       final BlockInfo last = file.getLastBlock();
       if (last != null && last.getBlockUCState()
           == BlockUCState.UNDER_RECOVERY) {
-        final Block truncatedBlock = ((BlockInfoContiguousUnderConstruction) last)
+        final Block truncatedBlock = last.getUnderConstructionFeature()
             .getTruncateBlock();
         if (truncatedBlock != null) {
           final long truncateLength = file.computeFileSize(false, false)
@@ -222,42 +223,42 @@ final class FSDirTruncateOp {
               oldBlock)));
     }
 
-    BlockInfoContiguousUnderConstruction truncatedBlockUC;
+    final BlockInfo truncatedBlockUC;
     BlockManager blockManager = fsn.getFSDirectory().getBlockManager();
     if (shouldCopyOnTruncate) {
       // Add new truncateBlock into blocksMap and
       // use oldBlock as a source for copy-on-truncate recovery
-      truncatedBlockUC = new BlockInfoContiguousUnderConstruction(newBlock,
+      truncatedBlockUC = new BlockInfoContiguous(newBlock,
           file.getPreferredBlockReplication());
+      truncatedBlockUC.convertToBlockUnderConstruction(
+          BlockUCState.UNDER_CONSTRUCTION, blockManager.getStorages(oldBlock));
       truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
-      truncatedBlockUC.setTruncateBlock(oldBlock);
-      file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
+      truncatedBlockUC.getUnderConstructionFeature().setTruncateBlock(oldBlock);
+      file.setLastBlock(truncatedBlockUC);
       blockManager.addBlockCollection(truncatedBlockUC, file);
 
       NameNode.stateChangeLog.debug(
           "BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new"
               + " size {}  new block {} old block {}",
-          truncatedBlockUC.getNumBytes(), newBlock,
-          truncatedBlockUC.getTruncateBlock());
+          truncatedBlockUC.getNumBytes(), newBlock, oldBlock);
     } else {
       // Use new generation stamp for in-place truncate recovery
       blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
       oldBlock = file.getLastBlock();
       assert !oldBlock.isComplete() : "oldBlock should be under construction";
-      truncatedBlockUC = (BlockInfoContiguousUnderConstruction) oldBlock;
-      truncatedBlockUC.setTruncateBlock(new Block(oldBlock));
-      truncatedBlockUC.getTruncateBlock().setNumBytes(
-          oldBlock.getNumBytes() - lastBlockDelta);
-      truncatedBlockUC.getTruncateBlock().setGenerationStamp(
-          newBlock.getGenerationStamp());
+      BlockUnderConstructionFeature uc = oldBlock.getUnderConstructionFeature();
+      uc.setTruncateBlock(new Block(oldBlock));
+      uc.getTruncateBlock().setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
+      uc.getTruncateBlock().setGenerationStamp(newBlock.getGenerationStamp());
+      truncatedBlockUC = oldBlock;
 
-      NameNode.stateChangeLog.debug(
-          "BLOCK* prepareFileForTruncate: {} Scheduling in-place block "
-              + "truncate to new size {}", truncatedBlockUC.getTruncateBlock()
-              .getNumBytes(), truncatedBlockUC);
+      NameNode.stateChangeLog.debug("BLOCK* prepareFileForTruncate: " +
+          "{} Scheduling in-place block truncate to new size {}",
+          uc, uc.getTruncateBlock().getNumBytes());
     }
     if (shouldRecoverNow) {
-      truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp());
+      truncatedBlockUC.getUnderConstructionFeature().initializeBlockRecovery(
+          truncatedBlockUC, newBlock.getGenerationStamp());
     }
 
     return newBlock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 3d30a19..f04bec2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -43,8 +43,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@@ -73,7 +74,7 @@ class FSDirWriteFileOp {
       Block block) throws IOException {
     // modify file-> block and blocksMap
     // fileNode should be under construction
-    BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
+    BlockInfo uc = fileNode.removeLastBlock(block);
     if (uc == null) {
       return false;
     }
@@ -236,8 +237,7 @@ class FSDirWriteFileOp {
       } else {
         // add new chosen targets to already allocated block and return
         BlockInfo lastBlockInFile = pendingFile.getLastBlock();
-        ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
-            .setExpectedLocations(targets);
+        lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(lastBlockInFile.getGenerationStamp(), targets);
         offset = pendingFile.computeFileSize();
         return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
       }
@@ -520,12 +520,10 @@ class FSDirWriteFileOp {
           fileINode.getPreferredBlockReplication(), true);
 
       // associate new last block for the file
-      BlockInfoContiguousUnderConstruction blockInfo =
-        new BlockInfoContiguousUnderConstruction(
-            block,
-            fileINode.getFileReplication(),
-            HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
-            targets);
+      BlockInfo blockInfo = new BlockInfoContiguous(block,
+          fileINode.getFileReplication());
+      blockInfo.convertToBlockUnderConstruction(
+          HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
       fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
       fileINode.addBlock(blockInfo);
 
@@ -662,10 +660,10 @@ class FSDirWriteFileOp {
             "allocation of a new block in " + src + ". Returning previously" +
             " allocated block " + lastBlockInFile);
         long offset = file.computeFileSize();
-        BlockInfoContiguousUnderConstruction lastBlockUC =
-            (BlockInfoContiguousUnderConstruction) lastBlockInFile;
+        BlockUnderConstructionFeature uc =
+            lastBlockInFile.getUnderConstructionFeature();
         onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
-            lastBlockUC.getExpectedStorageLocations(), offset);
+            uc.getExpectedStorageLocations(), offset);
         return new FileState(file, src, iip);
       } else {
         // Case 3

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 3dd076d..dfe897a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@ -950,7 +949,7 @@ public class FSEditLogLoader {
     if (pBlock != null) { // the penultimate block is not null
       Preconditions.checkState(oldBlocks != null && oldBlocks.length > 0);
       // compare pBlock with the last block of oldBlocks
-      Block oldLastBlock = oldBlocks[oldBlocks.length - 1];
+      BlockInfo oldLastBlock = oldBlocks[oldBlocks.length - 1];
       if (oldLastBlock.getBlockId() != pBlock.getBlockId()
           || oldLastBlock.getGenerationStamp() != pBlock.getGenerationStamp()) {
         throw new IOException(
@@ -960,17 +959,18 @@ public class FSEditLogLoader {
       }
       
       oldLastBlock.setNumBytes(pBlock.getNumBytes());
-      if (oldLastBlock instanceof BlockInfoContiguousUnderConstruction) {
-        fsNamesys.getBlockManager().forceCompleteBlock(file,
-            (BlockInfoContiguousUnderConstruction) oldLastBlock);
+      if (!oldLastBlock.isComplete()) {
+        fsNamesys.getBlockManager().forceCompleteBlock(file, oldLastBlock);
         fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock);
       }
     } else { // the penultimate block is null
       Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0);
     }
     // add the new block
-    BlockInfo newBI = new BlockInfoContiguousUnderConstruction(
-          newBlock, file.getPreferredBlockReplication());
+    BlockInfo newBI = new BlockInfoContiguous(newBlock,
+        file.getPreferredBlockReplication());
+    newBI.convertToBlockUnderConstruction(
+        HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
     fsNamesys.getBlockManager().addBlockCollection(newBI, file);
     file.addBlock(newBI);
     fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
@@ -1010,11 +1010,10 @@ public class FSEditLogLoader {
         oldBlock.getGenerationStamp() != newBlock.getGenerationStamp();
       oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
       
-      if (oldBlock instanceof BlockInfoContiguousUnderConstruction &&
+      if (!oldBlock.isComplete() &&
           (!isLastBlock || op.shouldCompleteLastBlock())) {
         changeMade = true;
-        fsNamesys.getBlockManager().forceCompleteBlock(file,
-            (BlockInfoContiguousUnderConstruction) oldBlock);
+        fsNamesys.getBlockManager().forceCompleteBlock(file, oldBlock);
       }
       if (changeMade) {
         // The state or gen-stamp of the block has changed. So, we may be
@@ -1049,8 +1048,10 @@ public class FSEditLogLoader {
           // TODO: shouldn't this only be true for the last block?
           // what about an old-version fsync() where fsync isn't called
           // until several blocks in?
-          newBI = new BlockInfoContiguousUnderConstruction(
-              newBlock, file.getPreferredBlockReplication());
+          newBI = new BlockInfoContiguous(newBlock,
+              file.getPreferredBlockReplication());
+          newBI.convertToBlockUnderConstruction(
+              HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
         } else {
           // OP_CLOSE should add finalized blocks. This code path
           // is only executed when loading edits written by prior

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index 30517d0..92f333a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@ -777,8 +776,8 @@ public class FSImageFormat {
             // convert the last block to BlockUC
             if (blocks.length > 0) {
               BlockInfo lastBlk = blocks[blocks.length - 1];
-              blocks[blocks.length - 1] = new BlockInfoContiguousUnderConstruction(
-                  lastBlk, replication);
+              lastBlk.convertToBlockUnderConstruction(
+                  HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index e8378e5..25fd99d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -44,8 +44,8 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
@@ -363,9 +363,8 @@ public final class FSImageFormatPBINode {
         file.toUnderConstruction(uc.getClientName(), uc.getClientMachine());
         if (blocks.length > 0) {
           BlockInfo lastBlk = file.getLastBlock();
-          // replace the last block of file
-          file.setBlock(file.numBlocks() - 1, new BlockInfoContiguousUnderConstruction(
-              lastBlk, replication));
+          lastBlk.convertToBlockUnderConstruction(
+              HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
         }
       }
       return file;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index f71cf0b..96e4ecb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
@@ -137,8 +136,9 @@ public class FSImageSerialization {
     // last block is UNDER_CONSTRUCTION
     if(numBlocks > 0) {
       blk.readFields(in);
-      blocks[i] = new BlockInfoContiguousUnderConstruction(
-        blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
+      blocks[i] = new BlockInfoContiguous(blk, blockReplication);
+      blocks[i].convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
+          null);
     }
     PermissionStatus perm = PermissionStatus.read(in);
     String clientName = readString(in);
@@ -180,9 +180,9 @@ public class FSImageSerialization {
 
   /**
    * Serialize a {@link INodeFile} node
-   * @param node The node to write
+   * @param file The INodeFile to write
    * @param out The {@link DataOutputStream} where the fields are written
-   * @param writeBlock Whether to write block information
+   * @param writeUnderConstruction Whether to write under construction information
    */
   public static void writeINodeFile(INodeFile file, DataOutput out,
       boolean writeUnderConstruction) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index d34242c..4c71cd5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -142,7 +142,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -199,8 +198,8 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
@@ -3059,8 +3058,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
 
     // If penultimate block doesn't exist then its minReplication is met
-    boolean penultimateBlockMinReplication = penultimateBlock == null ? true :
-        blockManager.checkMinReplication(penultimateBlock);
+    boolean penultimateBlockMinReplication = penultimateBlock == null
+        || blockManager.checkMinReplication(penultimateBlock);
 
     switch(lastBlockState) {
     case COMPLETE:
@@ -3089,24 +3088,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throw new AlreadyBeingCreatedException(message);
     case UNDER_CONSTRUCTION:
     case UNDER_RECOVERY:
-      final BlockInfoContiguousUnderConstruction uc = (BlockInfoContiguousUnderConstruction)lastBlock;
+      BlockUnderConstructionFeature uc = lastBlock.getUnderConstructionFeature();
       // determine if last block was intended to be truncated
       Block recoveryBlock = uc.getTruncateBlock();
       boolean truncateRecovery = recoveryBlock != null;
       boolean copyOnTruncate = truncateRecovery &&
-          recoveryBlock.getBlockId() != uc.getBlockId();
+          recoveryBlock.getBlockId() != lastBlock.getBlockId();
       assert !copyOnTruncate ||
-          recoveryBlock.getBlockId() < uc.getBlockId() &&
-          recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() &&
-          recoveryBlock.getNumBytes() > uc.getNumBytes() :
+          recoveryBlock.getBlockId() < lastBlock.getBlockId() &&
+          recoveryBlock.getGenerationStamp() < lastBlock.getGenerationStamp() &&
+          recoveryBlock.getNumBytes() > lastBlock.getNumBytes() :
             "wrong recoveryBlock";
 
       // setup the last block locations from the blockManager if not known
       if (uc.getNumExpectedLocations() == 0) {
-        uc.setExpectedLocations(blockManager.getStorages(lastBlock));
+        uc.setExpectedLocations(lastBlock.getGenerationStamp(),
+            blockManager.getStorages(lastBlock));
       }
 
-      if (uc.getNumExpectedLocations() == 0 && uc.getNumBytes() == 0) {
+      if (uc.getNumExpectedLocations() == 0 && lastBlock.getNumBytes() == 0) {
         // There is no datanode reported to this block.
         // may be client have crashed before writing data to pipeline.
         // This blocks doesn't need any recovery.
@@ -3119,14 +3119,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         return true;
       }
       // start recovery of the last block for this file
-      long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc));
+      long blockRecoveryId = nextGenerationStamp(
+          blockIdManager.isLegacyBlock(lastBlock));
       lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
       if(copyOnTruncate) {
-        uc.setGenerationStamp(blockRecoveryId);
+        lastBlock.setGenerationStamp(blockRecoveryId);
       } else if(truncateRecovery) {
         recoveryBlock.setGenerationStamp(blockRecoveryId);
       }
-      uc.initializeBlockRecovery(blockRecoveryId);
+      uc.initializeBlockRecovery(lastBlock, blockRecoveryId);
       leaseManager.renewLease(lease);
       // Cannot close file right now, since the last block requires recovery.
       // This may potentially cause infinite loop in lease recovery
@@ -3205,7 +3206,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
   
   @Override
-  public boolean isInSnapshot(BlockInfoContiguousUnderConstruction blockUC) {
+  public boolean isInSnapshot(BlockInfo blockUC) {
     assert hasReadLock();
     final BlockCollection bc = blockUC.getBlockCollection();
     if (bc == null || !(bc instanceof INodeFile)
@@ -3252,7 +3253,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     waitForLoadingFSImage();
     writeLock();
     boolean copyTruncate = false;
-    BlockInfoContiguousUnderConstruction truncatedBlock = null;
+    BlockInfo truncatedBlock = null;
     try {
       checkOperation(OperationCategory.WRITE);
       // If a DN tries to commit to the standby, the recovery will
@@ -3309,9 +3310,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         return;
       }
 
-      truncatedBlock = (BlockInfoContiguousUnderConstruction) iFile
-          .getLastBlock();
-      long recoveryId = truncatedBlock.getBlockRecoveryId();
+      truncatedBlock = iFile.getLastBlock();
+      long recoveryId = truncatedBlock.getUnderConstructionFeature()
+          .getBlockRecoveryId();
       copyTruncate = truncatedBlock.getBlockId() != storedBlock.getBlockId();
       if(recoveryId != newgenerationstamp) {
         throw new IOException("The recovery id " + newgenerationstamp
@@ -3374,9 +3375,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                 trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
                 trimmedStorages.toArray(new String[trimmedStorages.size()]));
         if(copyTruncate) {
-          iFile.setLastBlock(truncatedBlock, trimmedStorageInfos);
+          iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos);
         } else {
-          iFile.setLastBlock(storedBlock, trimmedStorageInfos);
+          iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos);
           if (closeFile) {
             blockManager.markBlockReplicasAsCorrupt(storedBlock,
                 oldGenerationStamp, oldNumBytes, trimmedStorageInfos);
@@ -5351,8 +5352,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     assert hasWriteLock();
     // check the vadility of the block and lease holder name
     final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
-    final BlockInfoContiguousUnderConstruction blockinfo
-        = (BlockInfoContiguousUnderConstruction)pendingFile.getLastBlock();
+    final BlockInfo blockinfo = pendingFile.getLastBlock();
+    assert !blockinfo.isComplete();
 
     // check new GS & length: this is not expected
     if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() ||
@@ -5371,7 +5372,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // find the DatanodeDescriptor objects
     final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
         .getDatanodeStorageInfos(newNodes, newStorageIDs);
-    blockinfo.setExpectedLocations(storages);
+    blockinfo.getUnderConstructionFeature().setExpectedLocations(
+        blockinfo.getGenerationStamp(), storages);
 
     String src = pendingFile.getFullPathName();
     FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
index d07ae1f..26ee8f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 
 /**
@@ -61,7 +60,7 @@ public class FileUnderConstructionFeature implements INode.Feature {
     BlockInfo lastBlock = f.getLastBlock();
     assert (lastBlock != null) : "The last block for path "
         + f.getFullPathName() + " is null when updating its length";
-    assert (lastBlock instanceof BlockInfoContiguousUnderConstruction)
+    assert !lastBlock.isComplete()
         : "The last block for path " + f.getFullPathName()
             + " is not a BlockInfoUnderConstruction when updating its length";
     lastBlock.setNumBytes(lastBlockLength);
@@ -76,9 +75,8 @@ public class FileUnderConstructionFeature implements INode.Feature {
       final BlocksMapUpdateInfo collectedBlocks) {
     final BlockInfo[] blocks = f.getBlocks();
     if (blocks != null && blocks.length > 0
-        && blocks[blocks.length - 1] instanceof BlockInfoContiguousUnderConstruction) {
-      BlockInfoContiguousUnderConstruction lastUC =
-          (BlockInfoContiguousUnderConstruction) blocks[blocks.length - 1];
+        && !blocks[blocks.length - 1].isComplete()) {
+      BlockInfo lastUC = blocks[blocks.length - 1];
       if (lastUC.getNumBytes() == 0) {
         // this is a 0-sized block. do not need check its UC state here
         collectedBlocks.addDeleteBlock(lastUC);


Mime
View raw message