hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinayakum...@apache.org
Subject [3/4] hadoop git commit: HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE blocks.
Date Wed, 03 Feb 2016 02:13:23 GMT
HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE blocks.

(cherry picked from commit b10d8ced21a860390c46e7729a02b81d9f7b88e6)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0ad3c51d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0ad3c51d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0ad3c51d

Branch: refs/heads/branch-2.8
Commit: 0ad3c51dfb4ca50677e078ed870e8ec120436ea4
Parents: 1d15c90
Author: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Authored: Thu Jan 28 10:42:40 2016 +0800
Committer: Vinayakumar B <vinayakumarb@apache.org>
Committed: Wed Feb 3 07:41:06 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 41 ++++++++---
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   | 55 +++++++++------
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  3 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  4 ++
 .../server/blockmanagement/BlockManager.java    | 31 +++-----
 .../hdfs/server/namenode/FSDirAppendOp.java     | 15 +++-
 .../hdfs/server/namenode/FSDirWriteFileOp.java  |  4 +-
 .../hdfs/server/namenode/FSEditLogLoader.java   |  8 +--
 .../hdfs/server/namenode/FSNamesystem.java      | 74 +++++++++++++++-----
 .../hadoop/hdfs/server/namenode/INodeFile.java  | 56 +++++++++++----
 .../hdfs/server/namenode/LeaseManager.java      | 17 ++---
 .../org/apache/hadoop/hdfs/TestFileAppend.java  | 56 ++++++++++++++-
 .../hdfs/server/namenode/TestINodeFile.java     |  6 +-
 .../hdfs/server/namenode/TestLeaseManager.java  |  4 +-
 15 files changed, 271 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 088b10e..6271da2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -34,7 +34,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.lang.reflect.Proxy;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -168,6 +167,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.RpcNoSuchMethodException;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
@@ -182,16 +182,15 @@ import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
 import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.net.InetAddresses;
-import org.apache.htrace.core.Tracer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and
@@ -1355,17 +1354,43 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  /**
+   * Invoke namenode append RPC.
+   * It retries in case of {@link BlockNotYetCompleteException}.
+   */
+  private LastBlockWithStatus callAppend(String src,
+      EnumSetWritable<CreateFlag> flag) throws IOException {
+    final long startTime = Time.monotonicNow();
+    for(;;) {
+      try {
+        return namenode.append(src, clientName, flag);
+      } catch(RemoteException re) {
+        if (Time.monotonicNow() - startTime > 5000
+            || !RetriableException.class.getName().equals(
+                re.getClassName())) {
+          throw re;
+        }
+
+        try { // sleep and retry
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          throw DFSUtilClient.toInterruptedIOException("callAppend", e);
+        }
+      }
+    }
+  }
+
   /** Method to get stream returned by append call */
   private DFSOutputStream callAppend(String src, EnumSet<CreateFlag> flag,
       Progressable progress, String[] favoredNodes) throws IOException {
     CreateFlag.validateForAppend(flag);
     try {
-      LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
+      final LastBlockWithStatus blkWithStatus = callAppend(src,
           new EnumSetWritable<>(flag, CreateFlag.class));
       HdfsFileStatus status = blkWithStatus.getFileStatus();
       if (status == null) {
-        DFSClient.LOG.debug("NameNode is on an older version, request file " +
-            "info with additional RPC call for file: " + src);
+        LOG.debug("NameNode is on an older version, request file " +
+            "info with additional RPC call for file: {}", src);
         status = getFileInfo(src);
       }
       return DFSOutputStream.newStreamForAppend(this, src, flag, progress,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 61a34c2..2e891a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -17,9 +17,29 @@
  */
 package org.apache.hadoop.hdfs;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-import com.google.common.primitives.SignedBytes;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.SocketChannel;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
 import org.apache.commons.io.Charsets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -52,26 +72,9 @@ import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.SocketFactory;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.SignedBytes;
 
 public class DFSUtilClient {
   public static final byte[] EMPTY_BYTES = {};
@@ -676,4 +679,10 @@ public class DFSUtilClient {
     }
   }
 
+  public static InterruptedIOException toInterruptedIOException(String message,
+      InterruptedException e) {
+    final InterruptedIOException iioe = new InterruptedIOException(message);
+    iioe.initCause(e);
+    return iioe;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index b406c67..abbb1ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -455,8 +455,7 @@ class DataStreamer extends Daemon {
     setPipeline(lastBlock);
     if (nodes.length < 1) {
       throw new IOException("Unable to retrieve blocks locations " +
-          " for last block " + block +
-          "of file " + src);
+          " for last block " + block + " of file " + src);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 1eb7ff7..824b869 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -956,6 +956,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9436. Make NNThroughputBenchmark$BlockReportStats run with 10
     datanodes by default. (Mingliang Liu via shv)
 
+    HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE
+    blocks.  (szetszwo)
+
   BUG FIXES
 
     HDFS-8091: ACLStatus and XAttributes should be presented to

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 7450730..f981b33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -189,6 +189,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_REPLICATION_MIN_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
   public static final int     DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;
+  public static final String  DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY
+      = "dfs.namenode.file.close.num-committed-allowed";
+  public static final int     DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT
+      = 0;
   public static final String  DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY =
       "dfs.namenode.safemode.replication.min";
   public static final String  DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e6ec759..6bf0cc2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -618,6 +618,10 @@ public class BlockManager implements BlockStatsMXBean {
     return (countNodes(block).liveReplicas() >= minReplication);
   }
 
+  public short getMinReplication() {
+    return minReplication;
+  }
+
   /**
    * Commit a block of a file
    * 
@@ -665,7 +669,7 @@ public class BlockManager implements BlockStatsMXBean {
     final boolean b = commitBlock(lastBlock, commitBlock);
     if (countNodes(lastBlock).liveReplicas() >= minReplication) {
       if (b) {
-        addExpectedReplicasToPending(lastBlock);
+        addExpectedReplicasToPending(lastBlock, bc);
       }
       completeBlock(lastBlock, false);
     }
@@ -677,6 +681,10 @@ public class BlockManager implements BlockStatsMXBean {
    * pendingReplications in order to keep ReplicationMonitor from scheduling
    * the block.
    */
+  public void addExpectedReplicasToPending(BlockInfo blk, BlockCollection bc) {
+    addExpectedReplicasToPending(blk);
+  }
+
   private void addExpectedReplicasToPending(BlockInfo lastBlock) {
     DatanodeStorageInfo[] expectedStorages =
         lastBlock.getUnderConstructionFeature().getExpectedStorageLocations();
@@ -2617,7 +2625,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
         numLiveReplicas >= minReplication) {
-      addExpectedReplicasToPending(storedBlock);
+      addExpectedReplicasToPending(storedBlock, bc);
       completeBlock(storedBlock, false);
     } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
       // check whether safe replication is reached for the block
@@ -3453,25 +3461,6 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
-  /**
-   * Check that the indicated blocks are present and
-   * replicated.
-   */
-  public boolean checkBlocksProperlyReplicated(
-      String src, BlockInfo[] blocks) {
-    for (BlockInfo b: blocks) {
-      if (!b.isComplete()) {
-        final int numNodes = b.numNodes();
-        LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
-          + b.getBlockUCState() + ", replication# = " + numNodes
-          + (numNodes < minReplication ? " < ": " >= ")
-          + " minimum = " + minReplication + ") in file " + src);
-        return false;
-      }
-    }
-    return true;
-  }
-
   /** 
    * @return 0 if the block is not found;
    *         otherwise, return the replication factor of the block.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
index 53255e6..e5b1392 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
@@ -33,8 +33,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
+import org.apache.hadoop.ipc.RetriableException;
 
 import com.google.common.base.Preconditions;
 
@@ -119,10 +121,17 @@ final class FSDirAppendOp {
 
       final BlockInfo lastBlock = file.getLastBlock();
       // Check that the block has at least minimum replication.
-      if (lastBlock != null && lastBlock.isComplete()
+      if (lastBlock != null) {
+        if (lastBlock.getBlockUCState() == BlockUCState.COMMITTED) {
+          throw new RetriableException(
+              new NotReplicatedYetException("append: lastBlock="
+                  + lastBlock + " of src=" + path
+                  + " is COMMITTED but not yet COMPLETE."));
+        } else if (lastBlock.isComplete()
           && !blockManager.isSufficientlyReplicated(lastBlock)) {
-        throw new IOException("append: lastBlock=" + lastBlock + " of src="
-            + path + " is not sufficiently replicated yet.");
+          throw new IOException("append: lastBlock=" + lastBlock + " of src="
+              + path + " is not sufficiently replicated yet.");
+        }
       }
       lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock,
           true, logRetryCache);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 3662bce..17e2459 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -758,8 +758,10 @@ class FSDirWriteFileOp {
       return false;
     }
 
+    fsn.addCommittedBlocksToPending(pendingFile);
+
     fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
-        Snapshot.CURRENT_STATE_ID);
+        Snapshot.CURRENT_STATE_ID, true);
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 7077990..094bb9e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
 import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
@@ -29,7 +28,6 @@ import java.util.EnumMap;
 import java.util.EnumSet;
 import java.util.List;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -89,6 +87,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@@ -446,8 +445,9 @@ public class FSEditLogLoader {
       // One might expect that you could use removeLease(holder, path) here,
       // but OP_CLOSE doesn't serialize the holder. So, remove the inode.
       if (file.isUnderConstruction()) {
-        fsNamesys.leaseManager.removeLeases(Lists.newArrayList(file.getId()));
-        file.toCompleteFile(file.getModificationTime());
+        fsNamesys.getLeaseManager().removeLease(file.getId());
+        file.toCompleteFile(file.getModificationTime(), 0,
+            fsNamesys.getBlockManager().getMinReplication());
       }
       break;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 1ecb286..e8eae64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -463,6 +463,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   private final long minBlockSize;         // minimum block size
   final long maxBlocksPerFile;     // maximum # of blocks per file
+  private final int numCommittedAllowed;
 
   /** Lock to protect FSNamesystem. */
   private final FSNamesystemLock fsLock;
@@ -769,6 +770,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
       this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
           DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
+      this.numCommittedAllowed = conf.getInt(
+              DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
+              DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT);
       this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
       LOG.info("Append Enabled: " + supportAppends);
 
@@ -2602,18 +2606,37 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   boolean checkFileProgress(String src, INodeFile v, boolean checkall) {
     assert hasReadLock();
     if (checkall) {
-      return blockManager.checkBlocksProperlyReplicated(src, v
-          .getBlocks());
+      return checkBlocksComplete(src, true, v.getBlocks());
     } else {
-      // check the penultimate block of this file
-      BlockInfo b = v.getPenultimateBlock();
-      return b == null ||
-          blockManager.checkBlocksProperlyReplicated(
-              src, new BlockInfo[] { b });
+      final BlockInfo[] blocks = v.getBlocks();
+      final int i = blocks.length - numCommittedAllowed - 2;
+      return i < 0 || blocks[i] == null
+          || checkBlocksComplete(src, false, blocks[i]);
     }
   }
 
   /**
+   * Check if the blocks are COMPLETE;
+   * it may allow the last block to be COMMITTED.
+   */
+  private boolean checkBlocksComplete(String src, boolean allowCommittedBlock,
+      BlockInfo... blocks) {
+    final int n = allowCommittedBlock? numCommittedAllowed: 0;
+    for(int i = 0; i < blocks.length; i++) {
+      final short min = blockManager.getMinReplication();
+      final String err = INodeFile.checkBlockComplete(blocks, i, n, min);
+      if (err != null) {
+        final int numNodes = blocks[i].numNodes();
+        LOG.info("BLOCK* " + err + "(numNodes= " + numNodes
+            + (numNodes < min ? " < " : " >= ")
+            + " minimum = " + min + ") in file " + src);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * Change the indicated filename. 
    * @deprecated Use {@link #renameTo(String, String, boolean,
    * Options.Rename...)} instead.
@@ -2746,7 +2769,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       List<INode> removedINodes,
       final boolean acquireINodeMapLock) {
     assert hasWriteLock();
-    leaseManager.removeLeases(removedUCFiles);
+    for(long i : removedUCFiles) {
+      leaseManager.removeLease(i);
+    }
     // remove inodes from inodesMap
     if (removedINodes != null) {
       if (acquireINodeMapLock) {
@@ -3054,7 +3079,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // then reap lease immediately and close the file.
     if(nrCompleteBlocks == nrBlocks) {
       finalizeINodeFileUnderConstruction(src, pendingFile,
-          iip.getLatestSnapshotId());
+          iip.getLatestSnapshotId(), false);
       NameNode.stateChangeLog.warn("BLOCK*"
         + " internalReleaseLease: All existing blocks are COMPLETE,"
         + " lease removed, file closed.");
@@ -3093,7 +3118,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if(penultimateBlockMinReplication &&
           blockManager.checkMinReplication(lastBlock)) {
         finalizeINodeFileUnderConstruction(src, pendingFile,
-            iip.getLatestSnapshotId());
+            iip.getLatestSnapshotId(), false);
         NameNode.stateChangeLog.warn("BLOCK*"
           + " internalReleaseLease: Committed blocks are minimally replicated,"
           + " lease removed, file closed.");
@@ -3137,7 +3162,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         // We can remove this block and close the file.
         pendingFile.removeLastBlock(lastBlock);
         finalizeINodeFileUnderConstruction(src, pendingFile,
-            iip.getLatestSnapshotId());
+            iip.getLatestSnapshotId(), false);
         NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: "
             + "Removed empty last block and closed file.");
         return true;
@@ -3202,8 +3227,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
-  void finalizeINodeFileUnderConstruction(
-      String src, INodeFile pendingFile, int latestSnapshot) throws IOException {
+  void addCommittedBlocksToPending(final INodeFile pendingFile) {
+    final BlockInfo[] blocks = pendingFile.getBlocks();
+    int i = blocks.length - numCommittedAllowed;
+    if (i < 0) {
+      i = 0;
+    }
+    for(; i < blocks.length; i++) {
+      final BlockInfo b = blocks[i];
+      if (b != null && b.getBlockUCState() == BlockUCState.COMMITTED) {
+        // b is COMMITTED but not yet COMPLETE, add it to pending replication.
+        blockManager.addExpectedReplicasToPending(b, pendingFile);
+      }
+    }
+  }
+
+  void finalizeINodeFileUnderConstruction(String src, INodeFile pendingFile,
+      int latestSnapshot, boolean allowCommittedBlock) throws IOException {
     assert hasWriteLock();
 
     FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
@@ -3218,7 +3258,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // The file is no longer pending.
     // Create permanent INode, update blocks. No need to replace the inode here
     // since we just remove the uc feature from pendingFile
-    pendingFile.toCompleteFile(now());
+    pendingFile.toCompleteFile(now(),
+        allowCommittedBlock? numCommittedAllowed: 0,
+        blockManager.getMinReplication());
 
     waitForLoadingFSImage();
     // close file and persist block allocations for this file
@@ -3468,8 +3510,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     commitOrCompleteLastBlock(pendingFile, iip, storedBlock);
 
     //remove lease, close file
-    finalizeINodeFileUnderConstruction(src, pendingFile,
-        Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID));
+    int s = Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID);
+    finalizeINodeFileUnderConstruction(src, pendingFile, s, false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 2eb9a80..e674c5d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -201,28 +201,56 @@ public class INodeFile extends INodeWithAdditionalFields
    * Convert the file to a complete file, i.e., to remove the Under-Construction
    * feature.
    */
-  public INodeFile toCompleteFile(long mtime) {
-    Preconditions.checkState(isUnderConstruction(),
-        "file is no longer under construction");
-    FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
-    if (uc != null) {
-      assertAllBlocksComplete();
-      removeFeature(uc);
-      this.setModificationTime(mtime);
-    }
-    return this;
+  void toCompleteFile(long mtime, int numCommittedAllowed, short minReplication) {
+    final FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
+    Preconditions.checkNotNull(uc, "File %s is not under construction", this);
+    assertAllBlocksComplete(numCommittedAllowed, minReplication);
+    removeFeature(uc);
+    setModificationTime(mtime);
   }
 
   /** Assert all blocks are complete. */
-  private void assertAllBlocksComplete() {
+  private void assertAllBlocksComplete(int numCommittedAllowed,
+      short minReplication) {
     if (blocks == null) {
       return;
     }
     for (int i = 0; i < blocks.length; i++) {
-      Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
-          + " %s %s since blocks[%s] is non-complete, where blocks=%s.",
-          getClass().getSimpleName(), this, i, Arrays.asList(blocks));
+      final String err = checkBlockComplete(blocks, i, numCommittedAllowed,
+          minReplication);
+      Preconditions.checkState(err == null,
+          "Unexpected block state: %s, file=%s (%s), blocks=%s (i=%s)",
+          err, this, getClass().getSimpleName(), Arrays.asList(blocks), i);
+    }
+  }
+
+  /**
+   * Check if the i-th block is COMPLETE;
+   * when the i-th block is the last block, it may be allowed to be COMMITTED.
+   *
+   * @return null if the block passes the check;
+   *              otherwise, return an error message.
+   */
+  static String checkBlockComplete(BlockInfo[] blocks, int i,
+      int numCommittedAllowed, short minReplication) {
+    final BlockInfo b = blocks[i];
+    final BlockUCState state = b.getBlockUCState();
+    if (state == BlockUCState.COMPLETE) {
+      return null;
+    }
+    if (i < blocks.length - numCommittedAllowed) {
+      return b + " is " + state + " but not COMPLETE";
     }
+    if (state != BlockUCState.COMMITTED) {
+      return b + " is " + state + " but neither COMPLETE nor COMMITTED";
+    }
+    final int numExpectedLocations
+        = b.getUnderConstructionFeature().getNumExpectedLocations();
+    if (numExpectedLocations <= minReplication) {
+      return b + " is " + state + " but numExpectedLocations = "
+          + numExpectedLocations + " <= minReplication = " + minReplication;
+    }
+    return null;
   }
 
   @Override // BlockCollection

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
index 908af45..6bc9e34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
@@ -160,6 +160,13 @@ public class LeaseManager {
     return lease;
   }
 
+  synchronized void removeLease(long inodeId) {
+    final Lease lease = leasesById.get(inodeId);
+    if (lease != null) {
+      removeLease(lease, inodeId);
+    }
+  }
+
   /**
    * Remove the specified lease and src.
    */
@@ -298,16 +305,6 @@ public class LeaseManager {
     }
   }
 
-  @VisibleForTesting
-  synchronized void removeLeases(Collection<Long> inodes) {
-    for (long inode : inodes) {
-      Lease lease = leasesById.get(inode);
-      if (lease != null) {
-        removeLease(lease, inode);
-      }
-    }
-  }
-
   public void setLeasePeriod(long softLimit, long hardLimit) {
     this.softLimit = softLimit;
     this.hardLimit = hardLimit; 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index ea1d0a6..84699a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -27,10 +27,12 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.HardLink;
@@ -41,12 +43,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.Time;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -55,6 +57,8 @@ import org.junit.Test;
  * support HDFS appends.
  */
 public class TestFileAppend{
+  private static final long RANDOM_TEST_RUNTIME = 10000;
+
   final boolean simulatedStorage = false;
 
   private static byte[] fileContents = null;
@@ -381,6 +385,56 @@ public class TestFileAppend{
     }
   }
 
+
+  @Test
+  public void testMultipleAppends() throws Exception {
+    final long startTime = Time.monotonicNow();
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(4).build();
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    try {
+      final Path p = new Path("/testMultipleAppend/foo");
+      final int blockSize = 1 << 16;
+      final byte[] data = AppendTestUtil.initBuffer(blockSize);
+
+      // create an empty file.
+      fs.create(p, true, 4096, (short)3, blockSize).close();
+
+      int fileLen = 0;
+      for(int i = 0;
+          i < 10 || Time.monotonicNow() - startTime < RANDOM_TEST_RUNTIME;
+          i++) {
+        int appendLen = ThreadLocalRandom.current().nextInt(100) + 1;
+        if (fileLen + appendLen > data.length) {
+          break;
+        }
+
+        AppendTestUtil.LOG.info(i + ") fileLen="  + fileLen
+            + ", appendLen=" + appendLen);
+        final FSDataOutputStream out = fs.append(p);
+        out.write(data, fileLen, appendLen);
+        out.close();
+        fileLen += appendLen;
+      }
+
+      Assert.assertEquals(fileLen, fs.getFileStatus(p).getLen());
+      final byte[] actual = new byte[fileLen];
+      final FSDataInputStream in = fs.open(p);
+      in.readFully(actual);
+      in.close();
+      for(int i = 0; i < fileLen; i++) {
+        Assert.assertEquals(data[i], actual[i]);
+      }
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
   /** Tests appending after soft-limit expires. */
   @Test
   public void testAppendAfterSoftLimit() 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
index 4d93a32..b5bb5d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
@@ -92,6 +92,10 @@ public class TestINodeFile {
         (short)3, 1024L);
   }
 
+  static void toCompleteFile(INodeFile file) {
+    file.toCompleteFile(Time.now(), 0, (short)1);
+  }
+
   INodeFile createINodeFile(short replication, long preferredBlockSize) {
     return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
         null, replication, preferredBlockSize, (byte)0);
@@ -1089,7 +1093,7 @@ public class TestINodeFile {
     assertEquals(clientName, uc.getClientName());
     assertEquals(clientMachine, uc.getClientMachine());
 
-    file.toCompleteFile(Time.now());
+    toCompleteFile(file);
     assertFalse(file.isUnderConstruction());
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
index de30161..3bb7bb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
@@ -51,8 +51,8 @@ public class TestLeaseManager {
     }
 
     assertEquals(4, lm.getINodeIdWithLeases().size());
-    synchronized (lm) {
-      lm.removeLeases(ids);
+    for (long id : ids) {
+      lm.removeLease(id);
     }
     assertEquals(0, lm.getINodeIdWithLeases().size());
   }


Mime
View raw message