hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject svn commit: r1609878 [3/9] - in /hadoop/common/branches/YARN-1051/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs...
Date Sat, 12 Jul 2014 02:24:55 GMT
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java Sat Jul 12 02:24:40 2014
@@ -23,14 +23,14 @@ import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo;
+
+import com.google.common.base.Objects;
 
 /**
  * This class represents to end users the difference between two snapshots of 
  * the same directory, or the difference between a snapshot of the directory and
- * its current state. Instead of capturing all the details of the diff, which 
- * is stored in {@link SnapshotDiffInfo}, this class only lists where the 
- * changes happened and their types.
+ * its current state. Instead of capturing all the details of the diff, this
+ * class only lists where the changes happened and their types.
  */
 public class SnapshotDiffReport {
   private final static String LINE_SEPARATOR = System.getProperty(
@@ -79,43 +79,64 @@ public class SnapshotDiffReport {
     /** The type of the difference. */
     private final DiffType type;
     /**
-     * The relative path (related to the snapshot root) of the file/directory
-     * where changes have happened
+     * The relative path (related to the snapshot root) of 1) the file/directory
+     * where changes have happened, or 2) the source file/dir of a rename op.
      */
-    private final byte[] relativePath;
+    private final byte[] sourcePath;
+    private final byte[] targetPath;
+
+    public DiffReportEntry(DiffType type, byte[] sourcePath) {
+      this(type, sourcePath, null);
+    }
+
+    public DiffReportEntry(DiffType type, byte[][] sourcePathComponents) {
+      this(type, sourcePathComponents, null);
+    }
 
-    public DiffReportEntry(DiffType type, byte[] path) {
+    public DiffReportEntry(DiffType type, byte[] sourcePath, byte[] targetPath) {
       this.type = type;
-      this.relativePath = path;
+      this.sourcePath = sourcePath;
+      this.targetPath = targetPath;
     }
     
-    public DiffReportEntry(DiffType type, byte[][] pathComponents) {
+    public DiffReportEntry(DiffType type, byte[][] sourcePathComponents,
+        byte[][] targetPathComponents) {
       this.type = type;
-      this.relativePath = DFSUtil.byteArray2bytes(pathComponents);
+      this.sourcePath = DFSUtil.byteArray2bytes(sourcePathComponents);
+      this.targetPath = targetPathComponents == null ? null : DFSUtil
+          .byteArray2bytes(targetPathComponents);
     }
     
     @Override
     public String toString() {
-      return type.getLabel() + "\t" + getRelativePathString();
+      String str = type.getLabel() + "\t" + getPathString(sourcePath);
+      if (type == DiffType.RENAME) {
+        str += " -> " + getPathString(targetPath);
+      }
+      return str;
     }
     
     public DiffType getType() {
       return type;
     }
 
-    public String getRelativePathString() {
-      String path = DFSUtil.bytes2String(relativePath);
-      if (path.isEmpty()) {
+    static String getPathString(byte[] path) {
+      String pathStr = DFSUtil.bytes2String(path);
+      if (pathStr.isEmpty()) {
         return Path.CUR_DIR;
       } else {
-        return Path.CUR_DIR + Path.SEPARATOR + path;
+        return Path.CUR_DIR + Path.SEPARATOR + pathStr;
       }
     }
 
-    public byte[] getRelativePath() {
-      return relativePath;
+    public byte[] getSourcePath() {
+      return sourcePath;
     }
-    
+
+    public byte[] getTargetPath() {
+      return targetPath;
+    }
+
     @Override
     public boolean equals(Object other) {
       if (this == other) {
@@ -124,14 +145,15 @@ public class SnapshotDiffReport {
       if (other != null && other instanceof DiffReportEntry) {
         DiffReportEntry entry = (DiffReportEntry) other;
         return type.equals(entry.getType())
-            && Arrays.equals(relativePath, entry.getRelativePath());
+            && Arrays.equals(sourcePath, entry.getSourcePath())
+            && Arrays.equals(targetPath, entry.getTargetPath());
       }
       return false;
     }
     
     @Override
     public int hashCode() {
-      return Arrays.hashCode(relativePath);
+      return Objects.hashCode(getSourcePath(), getTargetPath());
     }
   }
   

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Sat Jul 12 02:24:40 2014
@@ -44,7 +44,6 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -1737,24 +1736,29 @@ public class PBHelper {
     }
     DiffType type = DiffType.getTypeFromLabel(entry
         .getModificationLabel());
-    return type == null ? null : 
-      new DiffReportEntry(type, entry.getFullpath().toByteArray());
+    return type == null ? null : new DiffReportEntry(type, entry.getFullpath()
+        .toByteArray(), entry.hasTargetPath() ? entry.getTargetPath()
+        .toByteArray() : null);
   }
   
   public static SnapshotDiffReportEntryProto convert(DiffReportEntry entry) {
     if (entry == null) {
       return null;
     }
-    byte[] fullPath = entry.getRelativePath();
-    ByteString fullPathString = ByteString
-        .copyFrom(fullPath == null ? DFSUtil.EMPTY_BYTES : fullPath);
-    
+    ByteString sourcePath = ByteString
+        .copyFrom(entry.getSourcePath() == null ? DFSUtil.EMPTY_BYTES : entry
+            .getSourcePath());
     String modification = entry.getType().getLabel();
-    
-    SnapshotDiffReportEntryProto entryProto = SnapshotDiffReportEntryProto
-        .newBuilder().setFullpath(fullPathString)
-        .setModificationLabel(modification).build();
-    return entryProto;
+    SnapshotDiffReportEntryProto.Builder builder = SnapshotDiffReportEntryProto
+        .newBuilder().setFullpath(sourcePath)
+        .setModificationLabel(modification);
+    if (entry.getType() == DiffType.RENAME) {
+      ByteString targetPath = ByteString
+          .copyFrom(entry.getTargetPath() == null ? DFSUtil.EMPTY_BYTES : entry
+              .getTargetPath());
+      builder.setTargetPath(targetPath);
+    }
+    return builder.build();
   }
   
   public static SnapshotDiffReport convert(SnapshotDiffReportProto reportProto) {
@@ -2093,6 +2097,9 @@ public class PBHelper {
   
   public static List<XAttrProto> convertXAttrProto(
       List<XAttr> xAttrSpec) {
+    if (xAttrSpec == null) {
+      return Lists.newArrayListWithCapacity(0);
+    }
     ArrayList<XAttrProto> xAttrs = Lists.newArrayListWithCapacity(
         xAttrSpec.size());
     for (XAttr a : xAttrSpec) {

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java Sat Jul 12 02:24:40 2014
@@ -23,6 +23,8 @@ import java.io.DataInputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@@ -75,4 +77,25 @@ public class DelegationTokenIdentifier 
       return ident.toString();
     }
   }
+  
+  public static class WebHdfsDelegationTokenIdentifier
+      extends DelegationTokenIdentifier {
+    public WebHdfsDelegationTokenIdentifier() {
+      super();
+    }
+    @Override
+    public Text getKind() {
+      return WebHdfsFileSystem.TOKEN_KIND;
+    }
+  }
+  
+  public static class SWebHdfsDelegationTokenIdentifier extends WebHdfsDelegationTokenIdentifier {
+	public SWebHdfsDelegationTokenIdentifier() {
+	  super();
+	}
+	@Override
+	public Text getKind() {
+	  return SWebHdfsFileSystem.TOKEN_KIND;
+	}
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sat Jul 12 02:24:40 2014
@@ -189,7 +189,6 @@ public class Balancer {
   /** The maximum number of concurrent blocks moves for 
    * balancing purpose at a datanode
    */
-  public static final int MAX_NUM_CONCURRENT_MOVES = 5;
   private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
   public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds
   public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes
@@ -231,6 +230,7 @@ public class Balancer {
 
   private final ExecutorService moverExecutor;
   private final ExecutorService dispatcherExecutor;
+  private final int maxConcurrentMovesPerNode;
 
   /* This class keeps track of a scheduled block move */
   private class PendingBlockMove {
@@ -516,8 +516,8 @@ public class Balancer {
     private long scheduledSize = 0L;
     protected long delayUntil = 0L;
     //  blocks being moved but not confirmed yet
-    private final List<PendingBlockMove> pendingBlocks =
-      new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); 
+    private final List<PendingBlockMove> pendingBlocks;
+    private final int maxConcurrentMoves;
     
     @Override
     public String toString() {
@@ -528,7 +528,8 @@ public class Balancer {
     /* Constructor 
      * Depending on avgutil & threshold, calculate maximum bytes to move 
      */
-    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) {
+    private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold,
+        int maxConcurrentMoves) {
       datanode = node;
       utilization = policy.getUtilization(node);
       final double avgUtil = policy.getAvgUtilization();
@@ -545,6 +546,8 @@ public class Balancer {
         maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
       }
       this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+      this.maxConcurrentMoves = maxConcurrentMoves;
+      this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
     }
     
     /** Get the datanode */
@@ -606,7 +609,7 @@ public class Balancer {
     
     /* Check if the node can schedule more blocks to move */
     synchronized private boolean isPendingQNotFull() {
-      if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
+      if ( pendingBlocks.size() < this.maxConcurrentMoves ) {
         return true;
       }
       return false;
@@ -655,8 +658,9 @@ public class Balancer {
             = new ArrayList<BalancerBlock>();
     
     /* constructor */
-    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
-      super(node, policy, threshold);
+    private Source(DatanodeInfo node, BalancingPolicy policy, double threshold,
+        int maxConcurrentMoves) {
+      super(node, policy, threshold, maxConcurrentMoves);
     }
     
     /** Add a node task */
@@ -869,6 +873,9 @@ public class Balancer {
     this.dispatcherExecutor = Executors.newFixedThreadPool(
             conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
                         DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
+    this.maxConcurrentMovesPerNode =
+        conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
   }
   
   /* Given a data node set, build a network topology and decide
@@ -908,7 +915,7 @@ public class Balancer {
       BalancerDatanode datanodeS;
       final double avg = policy.getAvgUtilization();
       if (policy.getUtilization(datanode) >= avg) {
-        datanodeS = new Source(datanode, policy, threshold);
+        datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode);
         if (isAboveAvgUtilized(datanodeS)) {
           this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
         } else {
@@ -919,7 +926,8 @@ public class Balancer {
               -threshold)*datanodeS.datanode.getCapacity()/100.0);
         }
       } else {
-        datanodeS = new BalancerDatanode(datanode, policy, threshold);
+        datanodeS = new BalancerDatanode(datanode, policy, threshold,
+            maxConcurrentMovesPerNode);
         if ( isBelowOrEqualAvgUtilized(datanodeS)) {
           this.belowAvgUtilizedDatanodes.add(datanodeS);
         } else {

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Sat Jul 12 02:24:40 2014
@@ -203,7 +203,7 @@ public class BlockInfo extends Block imp
       } else {
         // The block is on the DN but belongs to a different storage.
         // Update our state.
-        removeStorage(storage);
+        removeStorage(getStorageInfo(idx));
         added = false;      // Just updating storage. Return false.
       }
     }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Sat Jul 12 02:24:40 2014
@@ -263,8 +263,8 @@ public class BlockManager {
     heartbeatManager = datanodeManager.getHeartbeatManager();
 
     final long pendingPeriod = conf.getLong(
-        DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY,
-        DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT);
+        DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
+        DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
     invalidateBlocks = new InvalidateBlocks(
         datanodeManager.blockInvalidateLimit, pendingPeriod);
 
@@ -1226,8 +1226,14 @@ public class BlockManager {
     nodesToProcess = Math.min(nodes.size(), nodesToProcess);
 
     int blockCnt = 0;
-    for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
-      blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt));
+    for (DatanodeInfo dnInfo : nodes) {
+      int blocks = invalidateWorkForOneNode(dnInfo);
+      if (blocks > 0) {
+        blockCnt += blocks;
+        if (--nodesToProcess == 0) {
+          break;
+        }
+      }
     }
     return blockCnt;
   }
@@ -1749,6 +1755,7 @@ public class BlockManager {
     }
     blockLog.info("BLOCK* processReport: from storage " + storage.getStorageID()
         + " node " + nodeID + ", blocks: " + newReport.getNumberOfBlocks()
+        + ", hasStaleStorages: " + node.hasStaleStorages()
         + ", processing time: " + (endTime - startTime) + " msecs");
     return !node.hasStaleStorages();
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Sat Jul 12 02:24:40 2014
@@ -33,8 +33,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -53,8 +51,11 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+;
 
 /**
  * Scans the namesystem, scheduling blocks to be cached as appropriate.
@@ -65,8 +66,8 @@ import com.google.common.base.Preconditi
 @InterfaceAudience.LimitedPrivate({"HDFS"})
 public class CacheReplicationMonitor extends Thread implements Closeable {
 
-  private static final Log LOG =
-      LogFactory.getLog(CacheReplicationMonitor.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CacheReplicationMonitor.class);
 
   private final FSNamesystem namesystem;
 
@@ -207,7 +208,7 @@ public class CacheReplicationMonitor ext
       LOG.info("Shutting down CacheReplicationMonitor.");
       return;
     } catch (Throwable t) {
-      LOG.fatal("Thread exiting", t);
+      LOG.error("Thread exiting", t);
       terminate(1, t);
     }
   }
@@ -316,11 +317,8 @@ public class CacheReplicationMonitor ext
       scannedDirectives++;
       // Skip processing this entry if it has expired
       if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Directive " + directive.getId() + ": the directive " +
-              "expired at " + directive.getExpiryTime() + " (now = " +
-              now + ")");
-        }
+        LOG.debug("Directive {}: the directive expired at {} (now = {})",
+             directive.getId(), directive.getExpiryTime(), now);
         continue;
       }
       String path = directive.getPath();
@@ -329,17 +327,14 @@ public class CacheReplicationMonitor ext
         node = fsDir.getINode(path);
       } catch (UnresolvedLinkException e) {
         // We don't cache through symlinks
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Directive " + directive.getId() +
-              ": got UnresolvedLinkException while resolving path " + path);
-        }
+        LOG.debug("Directive {}: got UnresolvedLinkException while resolving "
+                + "path {}", directive.getId(), path
+        );
         continue;
       }
       if (node == null)  {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Directive " + directive.getId() +
-              ": No inode found at " + path);
-        }
+        LOG.debug("Directive {}: No inode found at {}", directive.getId(),
+            path);
       } else if (node.isDirectory()) {
         INodeDirectory dir = node.asDirectory();
         ReadOnlyList<INode> children = dir
@@ -352,10 +347,8 @@ public class CacheReplicationMonitor ext
       } else if (node.isFile()) {
         rescanFile(directive, node.asFile());
       } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Directive " + directive.getId() + 
-              ": ignoring non-directive, non-file inode " + node);
-        }
+        LOG.debug("Directive {}: ignoring non-directive, non-file inode {} ",
+            directive.getId(), node);
       }
     }
   }
@@ -381,15 +374,13 @@ public class CacheReplicationMonitor ext
     // do not cache this file.
     CachePool pool = directive.getPool();
     if (pool.getBytesNeeded() > pool.getLimit()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Directive %d: not scanning file %s because " +
-            "bytesNeeded for pool %s is %d, but the pool's limit is %d",
-            directive.getId(),
-            file.getFullPathName(),
-            pool.getPoolName(),
-            pool.getBytesNeeded(),
-            pool.getLimit()));
-      }
+      LOG.debug("Directive {}: not scanning file {} because " +
+          "bytesNeeded for pool {} is {}, but the pool's limit is {}",
+          directive.getId(),
+          file.getFullPathName(),
+          pool.getPoolName(),
+          pool.getBytesNeeded(),
+          pool.getLimit());
       return;
     }
 
@@ -397,11 +388,10 @@ public class CacheReplicationMonitor ext
     for (BlockInfo blockInfo : blockInfos) {
       if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
         // We don't try to cache blocks that are under construction.
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Directive " + directive.getId() + ": can't cache " +
-              "block " + blockInfo + " because it is in state " +
-              blockInfo.getBlockUCState() + ", not COMPLETE.");
-        }
+        LOG.trace("Directive {}: can't cache block {} because it is in state "
+                + "{}, not COMPLETE.", directive.getId(), blockInfo,
+            blockInfo.getBlockUCState()
+        );
         continue;
       }
       Block block = new Block(blockInfo.getBlockId());
@@ -415,7 +405,7 @@ public class CacheReplicationMonitor ext
         // Update bytesUsed using the current replication levels.
         // Assumptions: we assume that all the blocks are the same length
         // on each datanode.  We can assume this because we're only caching
-        // blocks in state COMMITTED.
+        // blocks in state COMPLETE.
         // Note that if two directives are caching the same block(s), they will
         // both get them added to their bytesCached.
         List<DatanodeDescriptor> cachedOn =
@@ -441,21 +431,16 @@ public class CacheReplicationMonitor ext
           ocblock.setReplicationAndMark(directive.getReplication(), mark);
         }
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Directive " + directive.getId() + ": setting replication " +
-                "for block " + blockInfo + " to " + ocblock.getReplication());
-      }
+      LOG.trace("Directive {}: setting replication for block {} to {}",
+          directive.getId(), blockInfo, ocblock.getReplication());
     }
     // Increment the "cached" statistics
     directive.addBytesCached(cachedTotal);
     if (cachedTotal == neededTotal) {
       directive.addFilesCached(1);
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Directive " + directive.getId() + ": caching " +
-          file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal +
-          " bytes");
-    }
+    LOG.debug("Directive {}: caching {}: {}/{} bytes", directive.getId(),
+        file.getFullPathName(), cachedTotal, neededTotal);
   }
 
   private String findReasonForNotCaching(CachedBlock cblock, 
@@ -512,11 +497,9 @@ public class CacheReplicationMonitor ext
           iter.hasNext(); ) {
         DatanodeDescriptor datanode = iter.next();
         if (!cblock.isInList(datanode.getCached())) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
-                "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() +
-                "because the DataNode uncached it.");
-          }
+          LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} "
+              + "because the DataNode uncached it.", cblock.getBlockId(),
+              datanode.getDatanodeUuid());
           datanode.getPendingUncached().remove(cblock);
           iter.remove();
         }
@@ -526,10 +509,8 @@ public class CacheReplicationMonitor ext
       String reason = findReasonForNotCaching(cblock, blockInfo);
       int neededCached = 0;
       if (reason != null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Block " + cblock.getBlockId() + ": can't cache " +
-              "block because it is " + reason);
-        }
+        LOG.trace("Block {}: can't cache block because it is {}",
+            cblock.getBlockId(), reason);
       } else {
         neededCached = cblock.getReplication();
       }
@@ -541,12 +522,12 @@ public class CacheReplicationMonitor ext
           DatanodeDescriptor datanode = iter.next();
           datanode.getPendingCached().remove(cblock);
           iter.remove();
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
-                "PENDING_CACHED for node " + datanode.getDatanodeUuid() +
-                "because we already have " + numCached + " cached " +
-                "replicas and we only need " + neededCached);
-          }
+          LOG.trace("Block {}: removing from PENDING_CACHED for node {}"
+                  + "because we already have {} cached replicas and we only" +
+                  " need {}",
+              cblock.getBlockId(), datanode.getDatanodeUuid(), numCached,
+              neededCached
+          );
         }
       }
       if (numCached < neededCached) {
@@ -556,12 +537,11 @@ public class CacheReplicationMonitor ext
           DatanodeDescriptor datanode = iter.next();
           datanode.getPendingUncached().remove(cblock);
           iter.remove();
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
-                "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() +
-                "because we only have " + numCached + " cached replicas " +
-                "and we need " + neededCached);
-          }
+          LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} "
+                  + "because we only have {} cached replicas and we need " +
+                  "{}", cblock.getBlockId(), datanode.getDatanodeUuid(),
+              numCached, neededCached
+          );
         }
       }
       int neededUncached = numCached -
@@ -581,11 +561,10 @@ public class CacheReplicationMonitor ext
           pendingUncached.isEmpty() &&
           pendingCached.isEmpty()) {
         // we have nothing more to do with this block.
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
-              "cachedBlocks, since neededCached == 0, and " +
-              "pendingUncached and pendingCached are empty.");
-        }
+        LOG.trace("Block {}: removing from cachedBlocks, since neededCached "
+                + "== 0, and pendingUncached and pendingCached are empty.",
+            cblock.getBlockId()
+        );
         cbIter.remove();
       }
     }
@@ -643,18 +622,14 @@ public class CacheReplicationMonitor ext
     BlockInfo blockInfo = blockManager.
           getStoredBlock(new Block(cachedBlock.getBlockId()));
     if (blockInfo == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Block " + cachedBlock.getBlockId() + ": can't add new " +
-            "cached replicas, because there is no record of this block " +
-            "on the NameNode.");
-      }
+      LOG.debug("Block {}: can't add new cached replicas," +
+          " because there is no record of this block " +
+          "on the NameNode.", cachedBlock.getBlockId());
       return;
     }
     if (!blockInfo.isComplete()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Block " + cachedBlock.getBlockId() + ": can't cache this " +
-            "block, because it is not yet complete.");
-      }
+      LOG.debug("Block {}: can't cache this block, because it is not yet"
+          + " complete.", cachedBlock.getBlockId());
       return;
     }
     // Filter the list of replicas to only the valid targets
@@ -678,7 +653,7 @@ public class CacheReplicationMonitor ext
       if (pendingCached.contains(datanode) || cached.contains(datanode)) {
         continue;
       }
-      long pendingCapacity = datanode.getCacheRemaining();
+      long pendingBytes = 0;
       // Subtract pending cached blocks from effective capacity
       Iterator<CachedBlock> it = datanode.getPendingCached().iterator();
       while (it.hasNext()) {
@@ -686,7 +661,7 @@ public class CacheReplicationMonitor ext
         BlockInfo info =
             blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
         if (info != null) {
-          pendingCapacity -= info.getNumBytes();
+          pendingBytes -= info.getNumBytes();
         }
       }
       it = datanode.getPendingUncached().iterator();
@@ -696,17 +671,17 @@ public class CacheReplicationMonitor ext
         BlockInfo info =
             blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
         if (info != null) {
-          pendingCapacity += info.getNumBytes();
+          pendingBytes += info.getNumBytes();
         }
       }
+      long pendingCapacity = pendingBytes + datanode.getCacheRemaining();
       if (pendingCapacity < blockInfo.getNumBytes()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Block " + blockInfo.getBlockId() + ": DataNode " +
-              datanode.getDatanodeUuid() + " is not a valid possibility " +
-              "because the block has size " + blockInfo.getNumBytes() + ", but " +
-              "the DataNode only has " + datanode.getCacheRemaining() + " " +
-              "bytes of cache remaining.");
-        }
+        LOG.trace("Block {}: DataNode {} is not a valid possibility " +
+            "because the block has size {}, but the DataNode only has {}" +
+            "bytes of cache remaining ({} pending bytes, {} already cached.",
+            blockInfo.getBlockId(), datanode.getDatanodeUuid(),
+            blockInfo.getNumBytes(), pendingCapacity, pendingBytes,
+            datanode.getCacheRemaining());
         outOfCapacity++;
         continue;
       }
@@ -715,22 +690,20 @@ public class CacheReplicationMonitor ext
     List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities,
         neededCached, blockManager.getDatanodeManager().getStaleInterval());
     for (DatanodeDescriptor datanode : chosen) {
-      if (LOG.isTraceEnabled()) {
-          LOG.trace("Block " + blockInfo.getBlockId() + ": added to " +
-              "PENDING_CACHED on DataNode " + datanode.getDatanodeUuid());
-      }
+      LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}",
+          blockInfo.getBlockId(), datanode.getDatanodeUuid());
       pendingCached.add(datanode);
       boolean added = datanode.getPendingCached().add(cachedBlock);
       assert added;
     }
     // We were unable to satisfy the requested replication factor
     if (neededCached > chosen.size()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Block " + blockInfo.getBlockId() + ": we only have " +
-            (cachedBlock.getReplication() - neededCached + chosen.size()) +
-            " of " + cachedBlock.getReplication() + " cached replicas.  " +
-            outOfCapacity + " DataNodes have insufficient cache capacity.");
-      }
+      LOG.debug("Block {}: we only have {} of {} cached replicas."
+              + " {} DataNodes have insufficient cache capacity.",
+          blockInfo.getBlockId(),
+          (cachedBlock.getReplication() - neededCached + chosen.size()),
+          cachedBlock.getReplication(), outOfCapacity
+      );
     }
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Sat Jul 12 02:24:40 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdfs.DFSUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -67,8 +68,8 @@ class InvalidateBlocks {
   }
 
   private void printBlockDeletionTime(final Log log) {
-    log.info(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY
-        + " is set to " + pendingPeriodInMs + " ms.");
+    log.info(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY
+        + " is set to " + DFSUtil.durationToString(pendingPeriodInMs));
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy MMM dd HH:mm:ss");
     Calendar calendar = new GregorianCalendar();
     calendar.add(Calendar.SECOND, (int) (this.pendingPeriodInMs / 1000));

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Sat Jul 12 02:24:40 2014
@@ -92,7 +92,8 @@ public final class HdfsServerConstants {
     RECOVER  ("-recover"),
     FORCE("-force"),
     NONINTERACTIVE("-nonInteractive"),
-    RENAMERESERVED("-renameReserved");
+    RENAMERESERVED("-renameReserved"),
+    METADATAVERSION("-metadataVersion");
 
     private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
         "(\\w+)\\((\\w+)\\)");

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Jul 12 02:24:40 2014
@@ -745,15 +745,19 @@ public class DataNode extends Configured
             " size (%s) is greater than zero and native code is not available.",
             DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
       }
-      long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
-      if (dnConf.maxLockedMemory > ulimit) {
-      throw new RuntimeException(String.format(
-          "Cannot start datanode because the configured max locked memory" +
-          " size (%s) of %d bytes is more than the datanode's available" +
-          " RLIMIT_MEMLOCK ulimit of %d bytes.",
-          DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
-          dnConf.maxLockedMemory,
-          ulimit));
+      if (Path.WINDOWS) {
+        NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
+      } else {
+        long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
+        if (dnConf.maxLockedMemory > ulimit) {
+          throw new RuntimeException(String.format(
+            "Cannot start datanode because the configured max locked memory" +
+            " size (%s) of %d bytes is more than the datanode's available" +
+            " RLIMIT_MEMLOCK ulimit of %d bytes.",
+            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+            dnConf.maxLockedMemory,
+            ulimit));
+        }
       }
     }
     LOG.info("Starting DataNode with maxLockedMemory = " +
@@ -778,7 +782,8 @@ public class DataNode extends Configured
     initIpcServer(conf);
 
     metrics = DataNodeMetrics.create(conf, getDisplayName());
-
+    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
+    
     blockPoolManager = new BlockPoolManager(this);
     blockPoolManager.refreshNamenodes(conf);
 
@@ -2298,11 +2303,11 @@ public class DataNode extends Configured
 
   @Override // ClientDataNodeProtocol
   public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
-    checkWriteAccess(block);
+    checkReadAccess(block);
     return data.getReplicaVisibleLength(block);
   }
 
-  private void checkWriteAccess(final ExtendedBlock block) throws IOException {
+  private void checkReadAccess(final ExtendedBlock block) throws IOException {
     if (isBlockTokenEnabled) {
       Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
           .getTokenIdentifiers();

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Sat Jul 12 02:24:40 2014
@@ -63,14 +63,17 @@ class DataXceiverServer implements Runna
    */
   static class BlockBalanceThrottler extends DataTransferThrottler {
    private int numThreads;
+   private int maxThreads;
    
    /**Constructor
     * 
     * @param bandwidth Total amount of bandwidth can be used for balancing 
     */
-   private BlockBalanceThrottler(long bandwidth) {
+   private BlockBalanceThrottler(long bandwidth, int maxThreads) {
      super(bandwidth);
+     this.maxThreads = maxThreads;
      LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
+     LOG.info("Number threads for balancing is "+ maxThreads);
    }
    
    /** Check if the block move can start. 
@@ -79,7 +82,7 @@ class DataXceiverServer implements Runna
     * the counter is incremented; False otherwise.
     */
    synchronized boolean acquire() {
-     if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
+     if (numThreads >= maxThreads) {
        return false;
      }
      numThreads++;
@@ -120,8 +123,10 @@ class DataXceiverServer implements Runna
     
     //set up parameter for cluster balancing
     this.balanceThrottler = new BlockBalanceThrottler(
-      conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 
-                   DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
+        conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
+            DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT),
+        conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+            DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));
   }
 
   @Override

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Sat Jul 12 02:24:40 2014
@@ -37,8 +37,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
@@ -47,6 +45,8 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2)
@@ -101,7 +101,8 @@ public class FsDatasetCache {
     }
   }
 
-  private static final Log LOG = LogFactory.getLog(FsDatasetCache.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FsDatasetCache
+      .class);
 
   /**
    * Stores MappableBlock objects and the states they're in.
@@ -245,21 +246,17 @@ public class FsDatasetCache {
     ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
     Value prevValue = mappableBlockMap.get(key);
     if (prevValue != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Block with id " + blockId + ", pool " + bpid +
-            " already exists in the FsDatasetCache with state " +
-            prevValue.state);
-      }
+      LOG.debug("Block with id {}, pool {} already exists in the "
+              + "FsDatasetCache with state {}", blockId, bpid, prevValue.state
+      );
       numBlocksFailedToCache.incrementAndGet();
       return;
     }
     mappableBlockMap.put(key, new Value(null, State.CACHING));
     volumeExecutor.execute(
         new CachingTask(key, blockFileName, length, genstamp));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Initiating caching for Block with id " + blockId +
-          ", pool " + bpid);
-    }
+    LOG.debug("Initiating caching for Block with id {}, pool {}", blockId,
+        bpid);
   }
 
   synchronized void uncacheBlock(String bpid, long blockId) {
@@ -270,44 +267,34 @@ public class FsDatasetCache {
             processBlockMunlockRequest(key)) {
       // TODO: we probably want to forcibly uncache the block (and close the 
       // shm) after a certain timeout has elapsed.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(key + " is anchored, and can't be uncached now.");
-      }
+      LOG.debug("{} is anchored, and can't be uncached now.", key);
       return;
     }
     if (prevValue == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
-            "does not need to be uncached, because it is not currently " +
-            "in the mappableBlockMap.");
-      }
+      LOG.debug("Block with id {}, pool {} does not need to be uncached, "
+          + "because it is not currently in the mappableBlockMap.", blockId,
+          bpid);
       numBlocksFailedToUncache.incrementAndGet();
       return;
     }
     switch (prevValue.state) {
     case CACHING:
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Cancelling caching for block with id " + blockId +
-            ", pool " + bpid + ".");
-      }
+      LOG.debug("Cancelling caching for block with id {}, pool {}.", blockId,
+          bpid);
       mappableBlockMap.put(key,
           new Value(prevValue.mappableBlock, State.CACHING_CANCELLED));
       break;
     case CACHED:
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
-            "has been scheduled for uncaching.");
-      }
+      LOG.debug(
+          "Block with id {}, pool {} has been scheduled for uncaching" + ".",
+          blockId, bpid);
       mappableBlockMap.put(key,
           new Value(prevValue.mappableBlock, State.UNCACHING));
       uncachingExecutor.execute(new UncachingTask(key));
       break;
     default:
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
-            "does not need to be uncached, because it is " +
-            "in state " + prevValue.state + ".");
-      }
+      LOG.debug("Block with id {}, pool {} does not need to be uncached, "
+          + "because it is in state {}.", blockId, bpid, prevValue.state);
       numBlocksFailedToUncache.incrementAndGet();
       break;
     }
@@ -386,10 +373,8 @@ public class FsDatasetCache {
           }
           mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED));
         }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Successfully cached " + key + ".  We are now caching " +
-              newUsedBytes + " bytes in total.");
-        }
+        LOG.debug("Successfully cached {}.  We are now caching {} bytes in"
+            + " total.", key, newUsedBytes);
         dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key);
         numBlocksCached.addAndGet(1);
         dataset.datanode.getMetrics().incrBlocksCached(1);
@@ -399,12 +384,10 @@ public class FsDatasetCache {
         IOUtils.closeQuietly(metaIn);
         if (!success) {
           if (reservedBytes) {
-            newUsedBytes = usedBytesCount.release(length);
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Caching of " + key + " was aborted.  We are now " +
-                "caching only " + newUsedBytes + " + bytes in total.");
+            usedBytesCount.release(length);
           }
+          LOG.debug("Caching of {} was aborted.  We are now caching only {} "
+                  + "bytes in total.", key, usedBytesCount.get());
           if (mappableBlock != null) {
             mappableBlock.close();
           }
@@ -444,10 +427,7 @@ public class FsDatasetCache {
           usedBytesCount.release(value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
       dataset.datanode.getMetrics().incrBlocksUncached(1);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Uncaching of " + key + " completed.  " +
-            "usedBytes = " + newUsedBytes);
-      }
+      LOG.debug("Uncaching of {} completed. usedBytes = {}", key, newUsedBytes);
     }
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Sat Jul 12 02:24:40 2014
@@ -90,13 +90,15 @@ public class DataNodeMetrics {
   final MutableQuantiles[] sendDataPacketBlockedOnNetworkNanosQuantiles;
   @Metric MutableRate sendDataPacketTransferNanos;
   final MutableQuantiles[] sendDataPacketTransferNanosQuantiles;
-  
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
-
-  public DataNodeMetrics(String name, String sessionId, int[] intervals) {
+  JvmMetrics jvmMetrics = null;
+  
+  public DataNodeMetrics(String name, String sessionId, int[] intervals,
+      final JvmMetrics jvmMetrics) {
     this.name = name;
+    this.jvmMetrics = jvmMetrics;    
     registry.tag(SessionId, sessionId);
     
     final int len = intervals.length;
@@ -131,7 +133,7 @@ public class DataNodeMetrics {
   public static DataNodeMetrics create(Configuration conf, String dnName) {
     String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
     MetricsSystem ms = DefaultMetricsSystem.instance();
-    JvmMetrics.create("DataNode", sessionId, ms);
+    JvmMetrics jm = JvmMetrics.create("DataNode", sessionId, ms);
     String name = "DataNodeActivity-"+ (dnName.isEmpty()
         ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() 
             : dnName.replace(':', '-'));
@@ -141,11 +143,15 @@ public class DataNodeMetrics {
         conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
     
     return ms.register(name, null, new DataNodeMetrics(name, sessionId,
-        intervals));
+        intervals, jm));
   }
 
   public String name() { return name; }
 
+  public JvmMetrics getJvmMetrics() {
+    return jvmMetrics;
+  }
+  
   public void addHeartbeat(long latency) {
     heartbeats.add(latency);
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Sat Jul 12 02:24:40 2014
@@ -43,8 +43,6 @@ import java.util.TreeMap;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@@ -85,6 +83,8 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -99,7 +99,7 @@ import com.google.common.collect.Lists;
  */
 @InterfaceAudience.LimitedPrivate({"HDFS"})
 public final class CacheManager {
-  public static final Log LOG = LogFactory.getLog(CacheManager.class);
+  public static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
 
   private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f;
 
@@ -205,8 +205,8 @@ public final class CacheManager {
           DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT,
           DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT);
     if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) {
-      LOG.info("Using minimum value " + MIN_CACHED_BLOCKS_PERCENT +
-        " for " + DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT);
+      LOG.info("Using minimum value {} for {}", MIN_CACHED_BLOCKS_PERCENT,
+        DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT);
       cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT;
     }
     this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>(
@@ -346,10 +346,8 @@ public final class CacheManager {
    */
   private static long validateExpiryTime(CacheDirectiveInfo info,
       long maxRelativeExpiryTime) throws InvalidRequestException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Validating directive " + info
-          + " pool maxRelativeExpiryTime " + maxRelativeExpiryTime);
-    }
+    LOG.trace("Validating directive {} pool maxRelativeExpiryTime {}", info,
+        maxRelativeExpiryTime);
     final long now = new Date().getTime();
     final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime;
     if (info == null || info.getExpiration() == null) {
@@ -539,7 +537,7 @@ public final class CacheManager {
       LOG.warn("addDirective of " + info + " failed: ", e);
       throw e;
     }
-    LOG.info("addDirective of " + info + " successful.");
+    LOG.info("addDirective of {} successful.", info);
     return directive.toInfo();
   }
 
@@ -641,8 +639,7 @@ public final class CacheManager {
       LOG.warn("modifyDirective of " + idString + " failed: ", e);
       throw e;
     }
-    LOG.info("modifyDirective of " + idString + " successfully applied " +
-        info+ ".");
+    LOG.info("modifyDirective of {} successfully applied {}.", idString, info);
   }
 
   private void removeInternal(CacheDirective directive)
@@ -779,7 +776,7 @@ public final class CacheManager {
       LOG.info("addCachePool of " + info + " failed: ", e);
       throw e;
     }
-    LOG.info("addCachePool of " + info + " successful.");
+    LOG.info("addCachePool of {} successful.", info);
     return pool.getInfo(true);
   }
 
@@ -842,8 +839,8 @@ public final class CacheManager {
       LOG.info("modifyCachePool of " + info + " failed: ", e);
       throw e;
     }
-    LOG.info("modifyCachePool of " + info.getPoolName() + " successful; "
-        + bld.toString());
+    LOG.info("modifyCachePool of {} successful; {}", info.getPoolName(), 
+        bld.toString());
   }
 
   /**
@@ -935,11 +932,9 @@ public final class CacheManager {
     if (metrics != null) {
       metrics.addCacheBlockReport((int) (endTime - startTime));
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processed cache report from "
-          + datanodeID + ", blocks: " + blockIds.size()
-          + ", processing time: " + (endTime - startTime) + " msecs");
-    }
+    LOG.debug("Processed cache report from {}, blocks: {}, " +
+        "processing time: {} msecs", datanodeID, blockIds.size(), 
+        (endTime - startTime));
   }
 
   private void processCacheReportImpl(final DatanodeDescriptor datanode,
@@ -950,6 +945,8 @@ public final class CacheManager {
     CachedBlocksList pendingCachedList = datanode.getPendingCached();
     for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
       long blockId = iter.next();
+      LOG.trace("Cache report from datanode {} has block {}", datanode,
+          blockId);
       CachedBlock cachedBlock =
           new CachedBlock(blockId, (short)0, false);
       CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock);
@@ -959,15 +956,18 @@ public final class CacheManager {
         cachedBlock = prevCachedBlock;
       } else {
         cachedBlocks.put(cachedBlock);
+        LOG.trace("Added block {}  to cachedBlocks", cachedBlock);
       }
       // Add the block to the datanode's implicit cached block list
       // if it's not already there.  Similarly, remove it from the pending
       // cached block list if it exists there.
       if (!cachedBlock.isPresent(cachedList)) {
         cachedList.add(cachedBlock);
+        LOG.trace("Added block {} to CACHED list.", cachedBlock);
       }
       if (cachedBlock.isPresent(pendingCachedList)) {
         pendingCachedList.remove(cachedBlock);
+        LOG.trace("Removed block {} from PENDING_CACHED list.", cachedBlock);
       }
     }
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Sat Jul 12 02:24:40 2014
@@ -252,7 +252,7 @@ class Checkpointer extends Daemon {
     
     backupNode.namesystem.writeLock();
     try {
-      backupNode.namesystem.dir.setReady();
+      backupNode.namesystem.setImageLoaded();
       if(backupNode.namesystem.getBlocksTotal() > 0) {
         backupNode.namesystem.setBlockTotal();
       }



Mime
View raw message