hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1552467 [2/4] - in /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/...
Date Fri, 20 Dec 2013 01:01:23 GMT
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Fri Dec 20 01:01:18 2013
@@ -25,10 +25,12 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import java.util.EnumSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
@@ -455,8 +457,8 @@ public class RemoteBlockReader2  impleme
   }
 
   @Override
-  public ClientMmap getClientMmap(LocatedBlock curBlock,
-      ClientMmapManager manager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+        ClientMmapManager mmapManager) {
     return null;
   }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java Fri Dec 20 01:01:18 2013
@@ -19,10 +19,12 @@ package org.apache.hadoop.hdfs.client;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -131,25 +133,26 @@ public class HdfsAdmin {
    * Add a new CacheDirectiveInfo.
    * 
    * @param info Information about a directive to add.
+   * @param flags {@link CacheFlag}s to use for this operation.
    * @return the ID of the directive that was created.
    * @throws IOException if the directive could not be added
    */
-  public long addCacheDirective(CacheDirectiveInfo info)
-      throws IOException {
-    return dfs.addCacheDirective(info);
+  public long addCacheDirective(CacheDirectiveInfo info,
+      EnumSet<CacheFlag> flags) throws IOException {
+  return dfs.addCacheDirective(info, flags);
   }
   
   /**
    * Modify a CacheDirective.
    * 
-   * @param info Information about the directive to modify.
-   *             You must set the ID to indicate which CacheDirective you want
-   *             to modify.
+   * @param info Information about the directive to modify. You must set the ID
+   *          to indicate which CacheDirective you want to modify.
+   * @param flags {@link CacheFlag}s to use for this operation.
    * @throws IOException if the directive could not be modified
    */
-  public void modifyCacheDirective(CacheDirectiveInfo info)
-      throws IOException {
-    dfs.modifyCacheDirective(info);
+  public void modifyCacheDirective(CacheDirectiveInfo info,
+      EnumSet<CacheFlag> flags) throws IOException {
+    dfs.modifyCacheDirective(info, flags);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java Fri Dec 20 01:01:18 2013
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hdfs.protocol;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import javax.annotation.Nullable;
@@ -32,14 +30,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.util.XMLUtils;
-import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
-import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
-import org.apache.hadoop.io.Text;
-import org.xml.sax.ContentHandler;
-import org.xml.sax.SAXException;
 
 /**
  * CachePoolInfo describes a cache pool.
@@ -64,7 +54,7 @@ public class CachePoolInfo {
   FsPermission mode;
 
   @Nullable
-  Integer weight;
+  Long limit;
 
   public CachePoolInfo(String poolName) {
     this.poolName = poolName;
@@ -101,12 +91,12 @@ public class CachePoolInfo {
     return this;
   }
 
-  public Integer getWeight() {
-    return weight;
+  public Long getLimit() {
+    return limit;
   }
 
-  public CachePoolInfo setWeight(Integer weight) {
-    this.weight = weight;
+  public CachePoolInfo setLimit(Long bytes) {
+    this.limit = bytes;
     return this;
   }
 
@@ -117,7 +107,7 @@ public class CachePoolInfo {
       append(", groupName:").append(groupName).
       append(", mode:").append((mode == null) ? "null" :
           String.format("0%03o", mode.toShort())).
-      append(", weight:").append(weight).
+      append(", limit:").append(limit).
       append("}").toString();
   }
   
@@ -134,7 +124,7 @@ public class CachePoolInfo {
         append(ownerName, other.ownerName).
         append(groupName, other.groupName).
         append(mode, other.mode).
-        append(weight, other.weight).
+        append(limit, other.limit).
         isEquals();
   }
 
@@ -145,7 +135,7 @@ public class CachePoolInfo {
         append(ownerName).
         append(groupName).
         append(mode).
-        append(weight).
+        append(limit).
         hashCode();
   }
 
@@ -153,8 +143,8 @@ public class CachePoolInfo {
     if (info == null) {
       throw new InvalidRequestException("CachePoolInfo is null");
     }
-    if ((info.getWeight() != null) && (info.getWeight() < 0)) {
-      throw new InvalidRequestException("CachePool weight is negative.");
+    if ((info.getLimit() != null) && (info.getLimit() < 0)) {
+      throw new InvalidRequestException("Limit is negative.");
     }
     validateName(info.poolName);
   }
@@ -167,66 +157,4 @@ public class CachePoolInfo {
       throw new IOException("invalid empty cache pool name");
     }
   }
-
-  public static CachePoolInfo readFrom(DataInput in) throws IOException {
-    String poolName = Text.readString(in);
-    CachePoolInfo info = new CachePoolInfo(poolName);
-    if (in.readBoolean()) {
-      info.setOwnerName(Text.readString(in));
-    }
-    if (in.readBoolean())  {
-      info.setGroupName(Text.readString(in));
-    }
-    if (in.readBoolean()) {
-      info.setMode(FsPermission.read(in));
-    }
-    if (in.readBoolean()) {
-      info.setWeight(in.readInt());
-    }
-    return info;
-  }
-
-  public void writeTo(DataOutput out) throws IOException {
-    Text.writeString(out, poolName);
-    boolean hasOwner, hasGroup, hasMode, hasWeight;
-    hasOwner = ownerName != null;
-    hasGroup = groupName != null;
-    hasMode = mode != null;
-    hasWeight = weight != null;
-    out.writeBoolean(hasOwner);
-    if (hasOwner) {
-      Text.writeString(out, ownerName);
-    }
-    out.writeBoolean(hasGroup);
-    if (hasGroup) {
-      Text.writeString(out, groupName);
-    }
-    out.writeBoolean(hasMode);
-    if (hasMode) {
-      mode.write(out);
-    }
-    out.writeBoolean(hasWeight);
-    if (hasWeight) {
-      out.writeInt(weight);
-    }
-  }
-
-  public void writeXmlTo(ContentHandler contentHandler) throws SAXException {
-    XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName);
-    PermissionStatus perm = new PermissionStatus(ownerName,
-        groupName, mode);
-    FSEditLogOp.permissionStatusToXml(contentHandler, perm);
-    XMLUtils.addSaxString(contentHandler, "WEIGHT", Integer.toString(weight));
-  }
-
-  public static CachePoolInfo readXmlFrom(Stanza st) throws InvalidXmlException {
-    String poolName = st.getValue("POOLNAME");
-    PermissionStatus perm = FSEditLogOp.permissionStatusFromXml(st);
-    int weight = Integer.parseInt(st.getValue("WEIGHT"));
-    return new CachePoolInfo(poolName).
-        setOwnerName(perm.getUserName()).
-        setGroupName(perm.getGroupName()).
-        setMode(perm.getPermission()).
-        setWeight(weight);
-  }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java Fri Dec 20 01:01:18 2013
@@ -30,6 +30,7 @@ public class CachePoolStats {
   public static class Builder {
     private long bytesNeeded;
     private long bytesCached;
+    private long bytesOverlimit;
     private long filesNeeded;
     private long filesCached;
 
@@ -46,6 +47,11 @@ public class CachePoolStats {
       return this;
     }
 
+    public Builder setBytesOverlimit(long bytesOverlimit) {
+      this.bytesOverlimit = bytesOverlimit;
+      return this;
+    }
+
     public Builder setFilesNeeded(long filesNeeded) {
       this.filesNeeded = filesNeeded;
       return this;
@@ -57,20 +63,22 @@ public class CachePoolStats {
     }
 
     public CachePoolStats build() {
-      return new CachePoolStats(bytesNeeded, bytesCached, filesNeeded,
-          filesCached);
+      return new CachePoolStats(bytesNeeded, bytesCached, bytesOverlimit,
+          filesNeeded, filesCached);
     }
   };
 
   private final long bytesNeeded;
   private final long bytesCached;
+  private final long bytesOverlimit;
   private final long filesNeeded;
   private final long filesCached;
 
-  private CachePoolStats(long bytesNeeded, long bytesCached, long filesNeeded,
-      long filesCached) {
+  private CachePoolStats(long bytesNeeded, long bytesCached,
+      long bytesOverlimit, long filesNeeded, long filesCached) {
     this.bytesNeeded = bytesNeeded;
     this.bytesCached = bytesCached;
+    this.bytesOverlimit = bytesOverlimit;
     this.filesNeeded = filesNeeded;
     this.filesCached = filesCached;
   }
@@ -83,6 +91,10 @@ public class CachePoolStats {
     return bytesCached;
   }
 
+  public long getBytesOverlimit() {
+    return bytesOverlimit;
+  }
+
   public long getFilesNeeded() {
     return filesNeeded;
   }
@@ -95,6 +107,7 @@ public class CachePoolStats {
     return new StringBuilder().append("{").
       append("bytesNeeded:").append(bytesNeeded).
       append(", bytesCached:").append(bytesCached).
+      append(", bytesOverlimit:").append(bytesOverlimit).
       append(", filesNeeded:").append(filesNeeded).
       append(", filesCached:").append(filesCached).
       append("}").toString();

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Dec 20 01:01:18 2013
@@ -19,10 +19,12 @@ package org.apache.hadoop.hdfs.protocol;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -1103,23 +1105,24 @@ public interface ClientProtocol {
    * Add a CacheDirective to the CacheManager.
    * 
    * @param directive A CacheDirectiveInfo to be added
+   * @param flags {@link CacheFlag}s to use for this operation.
    * @return A CacheDirectiveInfo associated with the added directive
    * @throws IOException if the directive could not be added
    */
   @AtMostOnce
-  public long addCacheDirective(
-      CacheDirectiveInfo directive) throws IOException;
+  public long addCacheDirective(CacheDirectiveInfo directive,
+      EnumSet<CacheFlag> flags) throws IOException;
 
   /**
    * Modify a CacheDirective in the CacheManager.
    * 
-   * @return directive The directive to modify.  Must contain 
-   *                   a directive ID.
+   * @return directive The directive to modify. Must contain a directive ID.
+   * @param flags {@link CacheFlag}s to use for this operation.
    * @throws IOException if the directive could not be modified
    */
   @AtMostOnce
-  public void modifyCacheDirective(
-      CacheDirectiveInfo directive) throws IOException;
+  public void modifyCacheDirective(CacheDirectiveInfo directive,
+      EnumSet<CacheFlag> flags) throws IOException;
 
   /**
    * Remove a CacheDirectiveInfo from the CacheManager.

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Dec 20 01:01:18 2013
@@ -350,7 +350,7 @@ public class ClientNamenodeProtocolServe
     try {
       HdfsFileStatus result = server.create(req.getSrc(),
           PBHelper.convert(req.getMasked()), req.getClientName(),
-          PBHelper.convert(req.getCreateFlag()), req.getCreateParent(),
+          PBHelper.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(),
           (short) req.getReplication(), req.getBlockSize());
 
       if (result != null) {
@@ -1064,9 +1064,11 @@ public class ClientNamenodeProtocolServe
       RpcController controller, AddCacheDirectiveRequestProto request)
       throws ServiceException {
     try {
+      long id = server.addCacheDirective(
+          PBHelper.convert(request.getInfo()),
+          PBHelper.convertCacheFlags(request.getCacheFlags()));
       return AddCacheDirectiveResponseProto.newBuilder().
-              setId(server.addCacheDirective(
-                  PBHelper.convert(request.getInfo()))).build();
+              setId(id).build();
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -1078,7 +1080,8 @@ public class ClientNamenodeProtocolServe
       throws ServiceException {
     try {
       server.modifyCacheDirective(
-          PBHelper.convert(request.getInfo()));
+          PBHelper.convert(request.getInfo()),
+          PBHelper.convertCacheFlags(request.getCacheFlags()));
       return ModifyCacheDirectiveResponseProto.newBuilder().build();
     } catch (IOException e) {
       throw new ServiceException(e);

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Fri Dec 20 01:01:18 2013
@@ -21,11 +21,13 @@ import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -1012,24 +1014,32 @@ public class ClientNamenodeProtocolTrans
   }
 
   @Override
-  public long addCacheDirective(
-      CacheDirectiveInfo directive) throws IOException {
+  public long addCacheDirective(CacheDirectiveInfo directive,
+      EnumSet<CacheFlag> flags) throws IOException {
     try {
-      return rpcProxy.addCacheDirective(null, 
-              AddCacheDirectiveRequestProto.newBuilder().
-                  setInfo(PBHelper.convert(directive)).build()).getId();
+      AddCacheDirectiveRequestProto.Builder builder =
+          AddCacheDirectiveRequestProto.newBuilder().
+              setInfo(PBHelper.convert(directive));
+      if (!flags.isEmpty()) {
+        builder.setCacheFlags(PBHelper.convertCacheFlags(flags));
+      }
+      return rpcProxy.addCacheDirective(null, builder.build()).getId();
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
   }
 
   @Override
-  public void modifyCacheDirective(
-      CacheDirectiveInfo directive) throws IOException {
+  public void modifyCacheDirective(CacheDirectiveInfo directive,
+      EnumSet<CacheFlag> flags) throws IOException {
     try {
-      rpcProxy.modifyCacheDirective(null,
+      ModifyCacheDirectiveRequestProto.Builder builder =
           ModifyCacheDirectiveRequestProto.newBuilder().
-              setInfo(PBHelper.convert(directive)).build());
+              setInfo(PBHelper.convert(directive));
+      if (!flags.isEmpty()) {
+        builder.setCacheFlags(PBHelper.convertCacheFlags(flags));
+      }
+      rpcProxy.modifyCacheDirective(null, builder.build());
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Dec 20 01:01:18 2013
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheFlagProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto;
@@ -1205,7 +1207,7 @@ public class PBHelper {
     return value;
   }
   
-  public static EnumSetWritable<CreateFlag> convert(int flag) {
+  public static EnumSetWritable<CreateFlag> convertCreateFlag(int flag) {
     EnumSet<CreateFlag> result = 
        EnumSet.noneOf(CreateFlag.class);   
     if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) {
@@ -1220,7 +1222,23 @@ public class PBHelper {
     }
     return new EnumSetWritable<CreateFlag>(result);
   }
-  
+
+  public static int convertCacheFlags(EnumSet<CacheFlag> flags) {
+    int value = 0;
+    if (flags.contains(CacheFlag.FORCE)) {
+      value |= CacheFlagProto.FORCE.getNumber();
+    }
+    return value;
+  }
+
+  public static EnumSet<CacheFlag> convertCacheFlags(int flags) {
+    EnumSet<CacheFlag> result = EnumSet.noneOf(CacheFlag.class);
+    if ((flags & CacheFlagProto.FORCE_VALUE) == CacheFlagProto.FORCE_VALUE) {
+      result.add(CacheFlag.FORCE);
+    }
+    return result;
+  }
+
   public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
     if (fs == null)
       return null;
@@ -1818,8 +1836,8 @@ public class PBHelper {
     if (info.getMode() != null) {
       builder.setMode(info.getMode().toShort());
     }
-    if (info.getWeight() != null) {
-      builder.setWeight(info.getWeight());
+    if (info.getLimit() != null) {
+      builder.setLimit(info.getLimit());
     }
     return builder.build();
   }
@@ -1837,8 +1855,8 @@ public class PBHelper {
     if (proto.hasMode()) {
       info.setMode(new FsPermission((short)proto.getMode()));
     }
-    if (proto.hasWeight()) {
-      info.setWeight(proto.getWeight());
+    if (proto.hasLimit())  {
+      info.setLimit(proto.getLimit());
     }
     return info;
   }
@@ -1847,6 +1865,7 @@ public class PBHelper {
     CachePoolStatsProto.Builder builder = CachePoolStatsProto.newBuilder();
     builder.setBytesNeeded(stats.getBytesNeeded());
     builder.setBytesCached(stats.getBytesCached());
+    builder.setBytesOverlimit(stats.getBytesOverlimit());
     builder.setFilesNeeded(stats.getFilesNeeded());
     builder.setFilesCached(stats.getFilesCached());
     return builder.build();
@@ -1856,6 +1875,7 @@ public class PBHelper {
     CachePoolStats.Builder builder = new CachePoolStats.Builder();
     builder.setBytesNeeded(proto.getBytesNeeded());
     builder.setBytesCached(proto.getBytesCached());
+    builder.setBytesOverlimit(proto.getBytesOverlimit());
     builder.setFilesNeeded(proto.getFilesNeeded());
     builder.setFilesCached(proto.getFilesCached());
     return builder.build();

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Dec 20 01:01:18 2013
@@ -2893,6 +2893,7 @@ assert storedBlock.findDatanode(dn) < 0 
    */
   boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
     boolean status = false;
+    boolean firstReplicationLog = true;
     int underReplicatedBlocks = 0;
     int decommissionOnlyReplicas = 0;
     int underReplicatedInOpenFiles = 0;
@@ -2907,10 +2908,17 @@ assert storedBlock.findDatanode(dn) < 0 
         int curExpectedReplicas = getReplication(block);
         if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
           if (curExpectedReplicas > curReplicas) {
-            //Log info about one block for this node which needs replication
+            // Log info about one block for this node which needs replication
             if (!status) {
               status = true;
-              logBlockReplicationInfo(block, srcNode, num);
+              if (firstReplicationLog) {
+                logBlockReplicationInfo(block, srcNode, num);
+              }
+              // Allowing decommission as long as default replication is met
+              if (curReplicas >= defaultReplication) {
+                status = false;
+                firstReplicationLog = false;
+              }
             }
             underReplicatedBlocks++;
             if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Fri Dec 20 01:01:18 2013
@@ -27,6 +27,9 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,6 +51,8 @@ import org.apache.hadoop.hdfs.util.ReadO
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Scans the namesystem, scheduling blocks to be cached as appropriate.
  *
@@ -79,25 +84,52 @@ public class CacheReplicationMonitor ext
   private final long intervalMs;
 
   /**
-   * True if we should rescan immediately, regardless of how much time
-   * elapsed since the previous scan.
+   * The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and
+   * waiting for rescan operations.
    */
-  private boolean rescanImmediately;
+  private final ReentrantLock lock = new ReentrantLock();
 
   /**
-   * The monotonic time at which the current scan started.
+   * Notifies the scan thread that an immediate rescan is needed.
    */
-  private long scanTimeMs;
+  private final Condition doRescan = lock.newCondition();
 
   /**
-   * Mark status of the current scan.
+   * Notifies waiting threads that a rescan has finished.
    */
-  private boolean mark = false;
+  private final Condition scanFinished = lock.newCondition();
 
   /**
-   * True if this monitor should terminate.
+   * Whether there are pending CacheManager operations that necessitate a
+   * CacheReplicationMonitor rescan. Protected by the CRM lock.
+   */
+  private boolean needsRescan = true;
+
+  /**
+   * Whether we are currently doing a rescan. Protected by the CRM lock.
+   */
+  private boolean isScanning = false;
+
+  /**
+   * The number of rescans completed. Used to wait for scans to finish.
+   * Protected by the CacheReplicationMonitor lock.
+   */
+  private long scanCount = 0;
+
+  /**
+   * True if this monitor should terminate. Protected by the CRM lock.
+   */
+  private boolean shutdown = false;
+
+  /**
+   * The monotonic time at which the current scan started.
    */
-  private boolean shutdown;
+  private long startTimeMs;
+
+  /**
+   * Mark status of the current scan.
+   */
+  private boolean mark = false;
 
   /**
    * Cache directives found in the previous scan.
@@ -108,7 +140,7 @@ public class CacheReplicationMonitor ext
    * Blocks found in the previous scan.
    */
   private long scannedBlocks;
-  
+
   public CacheReplicationMonitor(FSNamesystem namesystem,
       CacheManager cacheManager, long intervalMs) {
     this.namesystem = namesystem;
@@ -120,41 +152,60 @@ public class CacheReplicationMonitor ext
 
   @Override
   public void run() {
-    shutdown = false;
-    rescanImmediately = true;
-    scanTimeMs = 0;
+    startTimeMs = 0;
     LOG.info("Starting CacheReplicationMonitor with interval " +
              intervalMs + " milliseconds");
     try {
       long curTimeMs = Time.monotonicNow();
       while (true) {
-        synchronized(this) {
+        // Not all of the variables accessed here need the CRM lock, but take
+        // it anyway for simplicity
+        lock.lock();
+        try {
           while (true) {
             if (shutdown) {
               LOG.info("Shutting down CacheReplicationMonitor");
               return;
             }
-            if (rescanImmediately) {
-              LOG.info("Rescanning on request");
-              rescanImmediately = false;
+            if (needsRescan) {
+              LOG.info("Rescanning because of pending operations");
               break;
             }
-            long delta = (scanTimeMs + intervalMs) - curTimeMs;
+            long delta = (startTimeMs + intervalMs) - curTimeMs;
             if (delta <= 0) {
-              LOG.info("Rescanning after " + (curTimeMs - scanTimeMs) +
+              LOG.info("Rescanning after " + (curTimeMs - startTimeMs) +
                   " milliseconds");
               break;
             }
-            this.wait(delta);
+            doRescan.await(delta, TimeUnit.MILLISECONDS);
             curTimeMs = Time.monotonicNow();
           }
+        } finally {
+          lock.unlock();
+        }
+        // Mark scan as started, clear needsRescan
+        lock.lock();
+        try {
+          isScanning = true;
+          needsRescan = false;
+        } finally {
+          lock.unlock();
         }
-        scanTimeMs = curTimeMs;
+        startTimeMs = curTimeMs;
         mark = !mark;
         rescan();
         curTimeMs = Time.monotonicNow();
+        // Retake the CRM lock to update synchronization-related variables
+        lock.lock();
+        try {
+          isScanning = false;
+          scanCount++;
+          scanFinished.signalAll();
+        } finally {
+          lock.unlock();
+        }
         LOG.info("Scanned " + scannedDirectives + " directive(s) and " +
-            scannedBlocks + " block(s) in " + (curTimeMs - scanTimeMs) + " " +
+            scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " +
             "millisecond(s).");
       }
     } catch (Throwable t) {
@@ -164,15 +215,91 @@ public class CacheReplicationMonitor ext
   }
 
   /**
-   * Kick the monitor thread.
-   * 
-   * If it is sleeping, it will wake up and start scanning.
-   * If it is currently scanning, it will finish the scan and immediately do 
-   * another one.
-   */
-  public synchronized void kick() {
-    rescanImmediately = true;
-    this.notifyAll();
+   * Similar to {@link CacheReplicationMonitor#waitForRescan()}, except it only
+   * waits if there are pending operations that necessitate a rescan as
+   * indicated by {@link #setNeedsRescan()}.
+   * <p>
+   * Note that this call may release the FSN lock, so operations before and
+   * after are not necessarily atomic.
+   */
+  public void waitForRescanIfNeeded() {
+    lock.lock();
+    try {
+      if (!needsRescan) {
+        return;
+      }
+    } finally {
+      lock.unlock();
+    }
+    waitForRescan();
+  }
+
+  /**
+   * Waits for a rescan to complete. This doesn't guarantee consistency with
+   * pending operations, only relative recency, since it will not force a new
+   * rescan if a rescan is already underway.
+   * <p>
+   * Note that this call will release the FSN lock, so operations before and
+   * after are not atomic.
+   */
+  public void waitForRescan() {
+    // Drop the FSN lock temporarily and retake it after we finish waiting
+    // Need to handle both the read lock and the write lock
+    boolean retakeWriteLock = false;
+    if (namesystem.hasWriteLock()) {
+      namesystem.writeUnlock();
+      retakeWriteLock = true;
+    } else if (namesystem.hasReadLock()) {
+      namesystem.readUnlock();
+    } else {
+      // Expected to have at least one of the locks
+      Preconditions.checkState(false,
+          "Need to be holding either the read or write lock");
+    }
+    // try/finally for retaking FSN lock
+    try {
+      lock.lock();
+      // try/finally for releasing CRM lock
+      try {
+        // If no scan is already ongoing, mark the CRM as dirty and kick
+        if (!isScanning) {
+          needsRescan = true;
+          doRescan.signal();
+        }
+        // Wait until the scan finishes and the count advances
+        final long startCount = scanCount;
+        while (startCount >= scanCount) {
+          try {
+            scanFinished.await();
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting for CacheReplicationMonitor"
+                + " rescan", e);
+            break;
+          }
+        }
+      } finally {
+        lock.unlock();
+      }
+    } finally {
+      if (retakeWriteLock) {
+        namesystem.writeLock();
+      } else {
+        namesystem.readLock();
+      }
+    }
+  }
+
+  /**
+   * Indicates to the CacheReplicationMonitor that there have been CacheManager
+   * changes that require a rescan.
+   */
+  public void setNeedsRescan() {
+    lock.lock();
+    try {
+      this.needsRescan = true;
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -180,10 +307,14 @@ public class CacheReplicationMonitor ext
    */
   @Override
   public void close() throws IOException {
-    synchronized(this) {
+    lock.lock();
+    try {
       if (shutdown) return;
       shutdown = true;
-      this.notifyAll();
+      doRescan.signalAll();
+      scanFinished.signalAll();
+    } finally {
+      lock.unlock();
     }
     try {
       if (this.isAlive()) {
@@ -228,12 +359,14 @@ public class CacheReplicationMonitor ext
       // Reset the directive's statistics
       directive.resetStatistics();
       // Skip processing this entry if it has expired
-      LOG.info("Directive expiry is at " + directive.getExpiryTime());
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Directive expiry is at " + directive.getExpiryTime());
+      }
       if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Skipping directive id " + directive.getId()
               + " because it has expired (" + directive.getExpiryTime() + ">="
-              + now);
+              + now + ")");
         }
         continue;
       }
@@ -280,15 +413,27 @@ public class CacheReplicationMonitor ext
 
     // Increment the "needed" statistics
     directive.addFilesNeeded(1);
-    long neededTotal = 0;
-    for (BlockInfo blockInfo : blockInfos) {
-      long neededByBlock = 
-          directive.getReplication() * blockInfo.getNumBytes();
-       neededTotal += neededByBlock;
-    }
+    // We don't cache UC blocks, don't add them to the total here
+    long neededTotal = file.computeFileSizeNotIncludingLastUcBlock() *
+        directive.getReplication();
     directive.addBytesNeeded(neededTotal);
 
-    // TODO: Enforce per-pool quotas
+    // The pool's bytesNeeded is incremented as we scan. If the demand
+    // thus far plus the demand of this file would exceed the pool's limit,
+    // do not cache this file.
+    CachePool pool = directive.getPool();
+    if (pool.getBytesNeeded() > pool.getLimit()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Skipping directive id %d file %s because "
+            + "limit of pool %s would be exceeded (%d > %d)",
+            directive.getId(),
+            file.getFullPathName(),
+            pool.getPoolName(),
+            pool.getBytesNeeded(),
+            pool.getLimit()));
+      }
+      return;
+    }
 
     long cachedTotal = 0;
     for (BlockInfo blockInfo : blockInfos) {

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Dec 20 01:01:18 2013
@@ -117,6 +117,18 @@ public class JspHelper {
       return 0;
     }
   }
+  
+  /**
+   * convenience method for canonicalizing host name.
+   * @param addr name:port or name 
+   * @return canonicalized host name
+   */
+   public static String canonicalize(String addr) {
+    // default port 1 is supplied to allow addr without port.
+    // the port will be ignored.
+    return NetUtils.createSocketAddr(addr, 1).getAddress()
+           .getCanonicalHostName();
+  }
 
   /**
    * A helper class that generates the correct URL for different schema.
@@ -124,10 +136,11 @@ public class JspHelper {
    */
   public static final class Url {
     public static String authority(String scheme, DatanodeID d) {
+      String fqdn = canonicalize(d.getIpAddr());
       if (scheme.equals("http")) {
-        return d.getInfoAddr();
+        return fqdn + ":" + d.getInfoPort();
       } else if (scheme.equals("https")) {
-        return d.getInfoSecureAddr();
+        return fqdn + ":" + d.getInfoSecurePort();
       } else {
         throw new IllegalArgumentException("Unknown scheme:" + scheme);
       }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Fri Dec 20 01:01:18 2013
@@ -21,10 +21,13 @@ import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
@@ -67,7 +70,29 @@ public class BlockMetadataHeader {
     return checksum;
   }
 
- 
+  /**
+   * Read the header without changing the position of the FileChannel.
+   *
+   * @param fc The FileChannel to read.
+   * @return the Metadata Header.
+   * @throws IOException on error.
+   */
+  public static BlockMetadataHeader preadHeader(FileChannel fc)
+      throws IOException {
+    byte arr[] = new byte[2 + DataChecksum.HEADER_LEN];
+    ByteBuffer buf = ByteBuffer.wrap(arr);
+
+    while (buf.hasRemaining()) {
+      if (fc.read(buf, 0) <= 0) {
+        throw new EOFException("unexpected EOF while reading " +
+            "metadata file header");
+      }
+    }
+    short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
+    DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2);
+    return new BlockMetadataHeader(version, dataChecksum);
+  }
+
   /**
    * This reads all the fields till the beginning of checksum.
    * @param in 

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java Fri Dec 20 01:01:18 2013
@@ -21,8 +21,8 @@ package org.apache.hadoop.hdfs.server.da
  * The caching strategy we should use for an HDFS read or write operation.
  */
 public class CachingStrategy {
-  private Boolean dropBehind; // null = use server defaults
-  private Long readahead; // null = use server defaults
+  private final Boolean dropBehind; // null = use server defaults
+  private final Long readahead; // null = use server defaults
   
   public static CachingStrategy newDefaultStrategy() {
     return new CachingStrategy(null, null);
@@ -32,8 +32,28 @@ public class CachingStrategy {
     return new CachingStrategy(true, null);
   }
 
-  public CachingStrategy duplicate() {
-    return new CachingStrategy(this.dropBehind, this.readahead);
+  public static class Builder {
+    private Boolean dropBehind;
+    private Long readahead;
+
+    public Builder(CachingStrategy prev) {
+      this.dropBehind = prev.dropBehind;
+      this.readahead = prev.readahead;
+    }
+
+    public Builder setDropBehind(Boolean dropBehind) {
+      this.dropBehind = dropBehind;
+      return this;
+    }
+
+    public Builder setReadahead(Long readahead) {
+      this.readahead = readahead;
+      return this;
+    }
+
+    public CachingStrategy build() {
+      return new CachingStrategy(dropBehind, readahead);
+    }
   }
 
   public CachingStrategy(Boolean dropBehind, Long readahead) {
@@ -45,18 +65,10 @@ public class CachingStrategy {
     return dropBehind;
   }
   
-  public void setDropBehind(Boolean dropBehind) {
-    this.dropBehind = dropBehind;
-  }
-  
   public Long getReadahead() {
     return readahead;
   }
 
-  public void setReadahead(Long readahead) {
-    this.readahead = readahead;
-  }
-
   public String toString() {
     return "CachingStrategy(dropBehind=" + dropBehind +
         ", readahead=" + readahead + ")";

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Fri Dec 20 01:01:18 2013
@@ -78,18 +78,6 @@ public class DatanodeJspHelper {
   }
 
   /**
-   * Internal convenience method for canonicalizing host name.
-   * @param addr name:port or name 
-   * @return canonicalized host name
-   */
-  private static String canonicalize(String addr) {
-    // default port 1 is supplied to allow addr without port.
-    // the port will be ignored.
-    return NetUtils.createSocketAddr(addr, 1).getAddress()
-           .getCanonicalHostName();
-  }
-
-  /**
    * Get the default chunk size.
    * @param conf the configuration
    * @return the number of bytes to chunk in
@@ -228,7 +216,7 @@ public class DatanodeJspHelper {
       }
     }
     out.print("<br><a href=\"///"
-        + canonicalize(nnAddr) + ":"
+        + JspHelper.canonicalize(nnAddr) + ":"
         + namenodeInfoPort + "/dfshealth.jsp\">Go back to DFS home</a>");
     dfs.close();
   }
@@ -359,7 +347,7 @@ public class DatanodeJspHelper {
     // generate a table and dump the info
     out.println("\n<table>");
     
-    String nnCanonicalName = canonicalize(nnAddr);
+    String nnCanonicalName = JspHelper.canonicalize(nnAddr);
     for (LocatedBlock cur : blocks) {
       out.print("<tr>");
       final String blockidstring = Long.toString(cur.getBlock().getBlockId());

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Fri Dec 20 01:01:18 2013
@@ -27,11 +27,12 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
 
 import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -45,13 +46,16 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -68,7 +72,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LightWeightGSet;
@@ -342,6 +346,67 @@ public final class CacheManager {
   }
 
   /**
+   * Throws an exception if the CachePool does not have enough capacity to
+   * cache the given path at the replication factor.
+   *
+   * @param pool CachePool where the path is being cached
+   * @param path Path that is being cached
+   * @param replication Replication factor of the path
+   * @throws InvalidRequestException if the pool does not have enough capacity
+   */
+  private void checkLimit(CachePool pool, String path,
+      short replication) throws InvalidRequestException {
+    CacheDirectiveStats stats = computeNeeded(path, replication);
+    if (pool.getBytesNeeded() + (stats.getBytesNeeded() * replication) > pool
+        .getLimit()) {
+      throw new InvalidRequestException("Caching path " + path + " of size "
+          + stats.getBytesNeeded() / replication + " bytes at replication "
+          + replication + " would exceed pool " + pool.getPoolName()
+          + "'s remaining capacity of "
+          + (pool.getLimit() - pool.getBytesNeeded()) + " bytes.");
+    }
+  }
+
+  /**
+   * Computes the needed number of bytes and files for a path.
+   * @return CacheDirectiveStats describing the needed stats for this path
+   */
+  private CacheDirectiveStats computeNeeded(String path, short replication) {
+    FSDirectory fsDir = namesystem.getFSDirectory();
+    INode node;
+    long requestedBytes = 0;
+    long requestedFiles = 0;
+    CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
+    try {
+      node = fsDir.getINode(path);
+    } catch (UnresolvedLinkException e) {
+      // We don't cache through symlinks
+      return builder.build();
+    }
+    if (node == null) {
+      return builder.build();
+    }
+    if (node.isFile()) {
+      requestedFiles = 1;
+      INodeFile file = node.asFile();
+      requestedBytes = file.computeFileSize();
+    } else if (node.isDirectory()) {
+      INodeDirectory dir = node.asDirectory();
+      ReadOnlyList<INode> children = dir.getChildrenList(null);
+      requestedFiles = children.size();
+      for (INode child : children) {
+        if (child.isFile()) {
+          requestedBytes += child.asFile().computeFileSize();
+        }
+      }
+    }
+    return new CacheDirectiveStats.Builder()
+        .setBytesNeeded(requestedBytes)
+        .setFilesCached(requestedFiles)
+        .build();
+  }
+
+  /**
    * Get a CacheDirective by ID, validating the ID and that the directive
    * exists.
    */
@@ -384,6 +449,15 @@ public final class CacheManager {
       directivesByPath.put(path, directives);
     }
     directives.add(directive);
+    // Fix up pool stats
+    CacheDirectiveStats stats =
+        computeNeeded(directive.getPath(), directive.getReplication());
+    directive.addBytesNeeded(stats.getBytesNeeded());
+    directive.addFilesNeeded(directive.getFilesNeeded());
+
+    if (monitor != null) {
+      monitor.setNeedsRescan();
+    }
   }
 
   /**
@@ -407,7 +481,7 @@ public final class CacheManager {
   }
 
   public CacheDirectiveInfo addDirective(
-      CacheDirectiveInfo info, FSPermissionChecker pc)
+      CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)
       throws IOException {
     assert namesystem.hasWriteLock();
     CacheDirective directive;
@@ -418,6 +492,14 @@ public final class CacheManager {
       short replication = validateReplication(info, (short)1);
       long expiryTime = validateExpiryTime(info,
           CacheDirectiveInfo.Expiration.EXPIRY_NEVER);
+      // Do quota validation if required
+      if (!flags.contains(CacheFlag.FORCE)) {
+        // Can't kick and wait if caching is disabled
+        if (monitor != null) {
+          monitor.waitForRescan();
+        }
+        checkLimit(pool, path, replication);
+      }
       // All validation passed
       // Add a new entry with the next available ID.
       long id = getNextDirectiveId();
@@ -428,14 +510,11 @@ public final class CacheManager {
       throw e;
     }
     LOG.info("addDirective of " + info + " successful.");
-    if (monitor != null) {
-      monitor.kick();
-    }
     return directive.toInfo();
   }
 
   public void modifyDirective(CacheDirectiveInfo info,
-      FSPermissionChecker pc) throws IOException {
+      FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
     assert namesystem.hasWriteLock();
     String idString =
         (info.getId() == null) ?
@@ -463,6 +542,13 @@ public final class CacheManager {
       if (info.getPool() != null) {
         pool = getCachePool(validatePoolName(info));
         checkWritePermission(pc, pool);
+        if (!flags.contains(CacheFlag.FORCE)) {
+          // Can't kick and wait if caching is disabled
+          if (monitor != null) {
+            monitor.waitForRescan();
+          }
+          checkLimit(pool, path, replication);
+        }
       }
       removeInternal(prevEntry);
       CacheDirective newEntry =
@@ -489,9 +575,18 @@ public final class CacheManager {
     if (directives.size() == 0) {
       directivesByPath.remove(path);
     }
+    // Fix up the stats from removing the pool
+    final CachePool pool = directive.getPool();
+    directive.addBytesNeeded(-directive.getBytesNeeded());
+    directive.addFilesNeeded(-directive.getFilesNeeded());
+
     directivesById.remove(directive.getId());
-    directive.getPool().getDirectiveList().remove(directive);
+    pool.getDirectiveList().remove(directive);
     assert directive.getPool() == null;
+
+    if (monitor != null) {
+      monitor.setNeedsRescan();
+    }
   }
 
   public void removeDirective(long id, FSPermissionChecker pc)
@@ -505,9 +600,6 @@ public final class CacheManager {
       LOG.warn("removeDirective of " + id + " failed: ", e);
       throw e;
     }
-    if (monitor != null) {
-      monitor.kick();
-    }
     LOG.info("removeDirective of " + id + " successful.");
   }
 
@@ -527,6 +619,9 @@ public final class CacheManager {
     if (filter.getReplication() != null) {
       throw new IOException("Filtering by replication is unsupported.");
     }
+    if (monitor != null) {
+      monitor.waitForRescanIfNeeded();
+    }
     ArrayList<CacheDirectiveEntry> replies =
         new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
     int numReplies = 0;
@@ -573,16 +668,22 @@ public final class CacheManager {
   public CachePoolInfo addCachePool(CachePoolInfo info)
       throws IOException {
     assert namesystem.hasWriteLock();
-    CachePoolInfo.validate(info);
-    String poolName = info.getPoolName();
-    CachePool pool = cachePools.get(poolName);
-    if (pool != null) {
-      throw new InvalidRequestException("Cache pool " + poolName
-          + " already exists.");
-    }
-    pool = CachePool.createFromInfoAndDefaults(info);
-    cachePools.put(pool.getPoolName(), pool);
-    LOG.info("Created new cache pool " + pool);
+    CachePool pool;
+    try {
+      CachePoolInfo.validate(info);
+      String poolName = info.getPoolName();
+      pool = cachePools.get(poolName);
+      if (pool != null) {
+        throw new InvalidRequestException("Cache pool " + poolName
+            + " already exists.");
+      }
+      pool = CachePool.createFromInfoAndDefaults(info);
+      cachePools.put(pool.getPoolName(), pool);
+    } catch (IOException e) {
+      LOG.info("addCachePool of " + info + " failed: ", e);
+      throw e;
+    }
+    LOG.info("addCachePool of " + info + " successful.");
     return pool.getInfo(true);
   }
 
@@ -597,42 +698,51 @@ public final class CacheManager {
   public void modifyCachePool(CachePoolInfo info)
       throws IOException {
     assert namesystem.hasWriteLock();
-    CachePoolInfo.validate(info);
-    String poolName = info.getPoolName();
-    CachePool pool = cachePools.get(poolName);
-    if (pool == null) {
-      throw new InvalidRequestException("Cache pool " + poolName
-          + " does not exist.");
-    }
     StringBuilder bld = new StringBuilder();
-    String prefix = "";
-    if (info.getOwnerName() != null) {
-      pool.setOwnerName(info.getOwnerName());
-      bld.append(prefix).
-        append("set owner to ").append(info.getOwnerName());
-      prefix = "; ";
-    }
-    if (info.getGroupName() != null) {
-      pool.setGroupName(info.getGroupName());
-      bld.append(prefix).
-        append("set group to ").append(info.getGroupName());
-      prefix = "; ";
-    }
-    if (info.getMode() != null) {
-      pool.setMode(info.getMode());
-      bld.append(prefix).append("set mode to " + info.getMode());
-      prefix = "; ";
-    }
-    if (info.getWeight() != null) {
-      pool.setWeight(info.getWeight());
-      bld.append(prefix).
-        append("set weight to ").append(info.getWeight());
-      prefix = "; ";
-    }
-    if (prefix.isEmpty()) {
-      bld.append("no changes.");
+    try {
+      CachePoolInfo.validate(info);
+      String poolName = info.getPoolName();
+      CachePool pool = cachePools.get(poolName);
+      if (pool == null) {
+        throw new InvalidRequestException("Cache pool " + poolName
+            + " does not exist.");
+      }
+      String prefix = "";
+      if (info.getOwnerName() != null) {
+        pool.setOwnerName(info.getOwnerName());
+        bld.append(prefix).
+          append("set owner to ").append(info.getOwnerName());
+        prefix = "; ";
+      }
+      if (info.getGroupName() != null) {
+        pool.setGroupName(info.getGroupName());
+        bld.append(prefix).
+          append("set group to ").append(info.getGroupName());
+        prefix = "; ";
+      }
+      if (info.getMode() != null) {
+        pool.setMode(info.getMode());
+        bld.append(prefix).append("set mode to " + info.getMode());
+        prefix = "; ";
+      }
+      if (info.getLimit() != null) {
+        pool.setLimit(info.getLimit());
+        bld.append(prefix).append("set limit to " + info.getLimit());
+        prefix = "; ";
+        // New limit changes stats, need to set needs refresh
+        if (monitor != null) {
+          monitor.setNeedsRescan();
+        }
+      }
+      if (prefix.isEmpty()) {
+        bld.append("no changes.");
+      }
+    } catch (IOException e) {
+      LOG.info("modifyCachePool of " + info + " failed: ", e);
+      throw e;
     }
-    LOG.info("modified " + poolName + "; " + bld.toString());
+    LOG.info("modifyCachePool of " + info.getPoolName() + " successful; "
+        + bld.toString());
   }
 
   /**
@@ -646,28 +756,37 @@ public final class CacheManager {
   public void removeCachePool(String poolName)
       throws IOException {
     assert namesystem.hasWriteLock();
-    CachePoolInfo.validateName(poolName);
-    CachePool pool = cachePools.remove(poolName);
-    if (pool == null) {
-      throw new InvalidRequestException(
-          "Cannot remove non-existent cache pool " + poolName);
-    }
-    // Remove all directives in this pool.
-    Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
-    while (iter.hasNext()) {
-      CacheDirective directive = iter.next();
-      directivesByPath.remove(directive.getPath());
-      directivesById.remove(directive.getId());
-      iter.remove();
-    }
-    if (monitor != null) {
-      monitor.kick();
+    try {
+      CachePoolInfo.validateName(poolName);
+      CachePool pool = cachePools.remove(poolName);
+      if (pool == null) {
+        throw new InvalidRequestException(
+            "Cannot remove non-existent cache pool " + poolName);
+      }
+      // Remove all directives in this pool.
+      Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
+      while (iter.hasNext()) {
+        CacheDirective directive = iter.next();
+        directivesByPath.remove(directive.getPath());
+        directivesById.remove(directive.getId());
+        iter.remove();
+      }
+      if (monitor != null) {
+        monitor.setNeedsRescan();
+      }
+    } catch (IOException e) {
+      LOG.info("removeCachePool of " + poolName + " failed: ", e);
+      throw e;
     }
+    LOG.info("removeCachePool of " + poolName + " successful.");
   }
 
   public BatchedListEntries<CachePoolEntry>
       listCachePools(FSPermissionChecker pc, String prevKey) {
     assert namesystem.hasReadLock();
+    if (monitor != null) {
+      monitor.waitForRescanIfNeeded();
+    }
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
     ArrayList<CachePoolEntry> results = 
         new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
@@ -782,7 +901,7 @@ public final class CacheManager {
    * @param sdPath path of the storage directory
    * @throws IOException
    */
-  public void saveState(DataOutput out, String sdPath)
+  public void saveState(DataOutputStream out, String sdPath)
       throws IOException {
     out.writeLong(nextDirectiveId);
     savePools(out, sdPath);
@@ -805,7 +924,7 @@ public final class CacheManager {
   /**
    * Save cache pools to fsimage
    */
-  private void savePools(DataOutput out,
+  private void savePools(DataOutputStream out,
       String sdPath) throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = new Step(StepType.CACHE_POOLS, sdPath);
@@ -814,7 +933,7 @@ public final class CacheManager {
     Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
     out.writeInt(cachePools.size());
     for (CachePool pool: cachePools.values()) {
-      pool.getInfo(true).writeTo(out);
+      FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true));
       counter.increment();
     }
     prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -823,7 +942,7 @@ public final class CacheManager {
   /*
    * Save cache entries to fsimage
    */
-  private void saveDirectives(DataOutput out, String sdPath)
+  private void saveDirectives(DataOutputStream out, String sdPath)
       throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
@@ -832,11 +951,7 @@ public final class CacheManager {
     Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
     out.writeInt(directivesById.size());
     for (CacheDirective directive : directivesById.values()) {
-      out.writeLong(directive.getId());
-      Text.writeString(out, directive.getPath());
-      out.writeShort(directive.getReplication());
-      Text.writeString(out, directive.getPool().getPoolName());
-      out.writeLong(directive.getExpiryTime());
+      FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo());
       counter.increment();
     }
     prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -854,7 +969,7 @@ public final class CacheManager {
     prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
     Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
     for (int i = 0; i < numberOfPools; i++) {
-      addCachePool(CachePoolInfo.readFrom(in));
+      addCachePool(FSImageSerialization.readCachePoolInfo(in));
       counter.increment();
     }
     prog.endStep(Phase.LOADING_FSIMAGE, step);
@@ -871,19 +986,17 @@ public final class CacheManager {
     prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
     Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
     for (int i = 0; i < numDirectives; i++) {
-      long directiveId = in.readLong();
-      String path = Text.readString(in);
-      short replication = in.readShort();
-      String poolName = Text.readString(in);
-      long expiryTime = in.readLong();
+      CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
       // Get pool reference by looking it up in the map
+      final String poolName = info.getPool();
       CachePool pool = cachePools.get(poolName);
       if (pool == null) {
         throw new IOException("Directive refers to pool " + poolName +
             ", which does not exist.");
       }
       CacheDirective directive =
-          new CacheDirective(directiveId, path, replication, expiryTime);
+          new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
+              info.getReplication(), info.getExpiration().getAbsoluteMillis());
       boolean addedDirective = pool.getDirectiveList().add(directive);
       assert addedDirective;
       if (directivesById.put(directive.getId(), directive) != null) {

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Fri Dec 20 01:01:18 2013
@@ -49,8 +49,8 @@ import com.google.common.base.Preconditi
 public final class CachePool {
   public static final Log LOG = LogFactory.getLog(CachePool.class);
 
-  public static final int DEFAULT_WEIGHT = 100;
-  
+  public static final long DEFAULT_LIMIT = Long.MAX_VALUE;
+
   @Nonnull
   private final String poolName;
 
@@ -71,7 +71,10 @@ public final class CachePool {
   @Nonnull
   private FsPermission mode;
 
-  private int weight;
+  /**
+   * Maximum number of bytes that can be cached in this pool.
+   */
+  private long limit;
 
   private long bytesNeeded;
   private long bytesCached;
@@ -118,10 +121,10 @@ public final class CachePool {
     }
     FsPermission mode = (info.getMode() == null) ? 
         FsPermission.getCachePoolDefault() : info.getMode();
-    Integer weight = (info.getWeight() == null) ?
-        DEFAULT_WEIGHT : info.getWeight();
+    long limit = info.getLimit() == null ?
+        DEFAULT_LIMIT : info.getLimit();
     return new CachePool(info.getPoolName(),
-        ownerName, groupName, mode, weight);
+        ownerName, groupName, mode, limit);
   }
 
   /**
@@ -131,11 +134,11 @@ public final class CachePool {
   static CachePool createFromInfo(CachePoolInfo info) {
     return new CachePool(info.getPoolName(),
         info.getOwnerName(), info.getGroupName(),
-        info.getMode(), info.getWeight());
+        info.getMode(), info.getLimit());
   }
 
   CachePool(String poolName, String ownerName, String groupName,
-      FsPermission mode, int weight) {
+      FsPermission mode, long limit) {
     Preconditions.checkNotNull(poolName);
     Preconditions.checkNotNull(ownerName);
     Preconditions.checkNotNull(groupName);
@@ -144,7 +147,7 @@ public final class CachePool {
     this.ownerName = ownerName;
     this.groupName = groupName;
     this.mode = new FsPermission(mode);
-    this.weight = weight;
+    this.limit = limit;
   }
 
   public String getPoolName() {
@@ -177,16 +180,16 @@ public final class CachePool {
     this.mode = new FsPermission(mode);
     return this;
   }
-  
-  public int getWeight() {
-    return weight;
+
+  public long getLimit() {
+    return limit;
   }
 
-  public CachePool setWeight(int weight) {
-    this.weight = weight;
+  public CachePool setLimit(long bytes) {
+    this.limit = bytes;
     return this;
   }
-  
+
   /**
    * Get either full or partial information about this CachePool.
    *
@@ -204,7 +207,7 @@ public final class CachePool {
     return info.setOwnerName(ownerName).
         setGroupName(groupName).
         setMode(new FsPermission(mode)).
-        setWeight(weight);
+        setLimit(limit);
   }
 
   /**
@@ -241,6 +244,10 @@ public final class CachePool {
     return bytesCached;
   }
 
+  public long getBytesOverlimit() {
+    return Math.max(bytesNeeded-limit, 0);
+  }
+
   public long getFilesNeeded() {
     return filesNeeded;
   }
@@ -258,6 +265,7 @@ public final class CachePool {
     return new CachePoolStats.Builder().
         setBytesNeeded(bytesNeeded).
         setBytesCached(bytesCached).
+        setBytesOverlimit(getBytesOverlimit()).
         setFilesNeeded(filesNeeded).
         setFilesCached(filesCached).
         build();
@@ -291,7 +299,7 @@ public final class CachePool {
         append(", ownerName:").append(ownerName).
         append(", groupName:").append(groupName).
         append(", mode:").append(mode).
-        append(", weight:").append(weight).
+        append(", limit:").append(limit).
         append(" }").toString();
   }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Dec 20 01:01:18 2013
@@ -160,10 +160,10 @@ public class FSEditLog implements LogsPu
   private long totalTimeTransactions;  // total time for all transactions
   private NameNodeMetrics metrics;
 
-  private NNStorage storage;
-  private Configuration conf;
+  private final NNStorage storage;
+  private final Configuration conf;
   
-  private List<URI> editsDirs;
+  private final List<URI> editsDirs;
 
   private ThreadLocal<OpInstanceCache> cache =
       new ThreadLocal<OpInstanceCache>() {
@@ -176,7 +176,7 @@ public class FSEditLog implements LogsPu
   /**
    * The edit directories that are shared between primary and secondary.
    */
-  private List<URI> sharedEditsDirs;
+  private final List<URI> sharedEditsDirs;
 
   private static class TransactionId {
     public long txid;
@@ -203,10 +203,6 @@ public class FSEditLog implements LogsPu
    * @param editsDirs List of journals to use
    */
   FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
-    init(conf, storage, editsDirs);
-  }
-  
-  private void init(Configuration conf, NNStorage storage, List<URI> editsDirs) {
     isSyncRunning = false;
     this.conf = conf;
     this.storage = storage;

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1552467&r1=1552466&r2=1552467&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Dec 20 01:01:18 2013
@@ -24,12 +24,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.EnumMap;
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -650,7 +652,7 @@ public class FSEditLogLoader {
       ModifyCacheDirectiveInfoOp modifyOp =
           (ModifyCacheDirectiveInfoOp) op;
       fsNamesys.getCacheManager().modifyDirective(
-          modifyOp.directive, null);
+          modifyOp.directive, null, EnumSet.of(CacheFlag.FORCE));
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
       }



Mime
View raw message