Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A447210CA7 for ; Fri, 20 Dec 2013 01:02:02 +0000 (UTC) Received: (qmail 5376 invoked by uid 500); 20 Dec 2013 01:02:02 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 5338 invoked by uid 500); 20 Dec 2013 01:02:02 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 5330 invoked by uid 99); 20 Dec 2013 01:02:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Dec 2013 01:02:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Dec 2013 01:01:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C04CC2388A9B; Fri, 20 Dec 2013 01:01:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1552467 [2/4] - in /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/... Date: Fri, 20 Dec 2013 01:01:23 -0000 To: hdfs-commits@hadoop.apache.org From: cnauroth@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131220010125.C04CC2388A9B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Fri Dec 20 01:01:18 2013 @@ -25,10 +25,12 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.util.EnumSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.net.Peer; @@ -455,8 +457,8 @@ public class RemoteBlockReader2 impleme } @Override - public ClientMmap getClientMmap(LocatedBlock curBlock, - ClientMmapManager manager) { + public ClientMmap getClientMmap(EnumSet opts, + ClientMmapManager mmapManager) { return null; } } Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java Fri Dec 20 01:01:18 2013 @@ -19,10 +19,12 @@ package org.apache.hadoop.hdfs.client; import java.io.IOException; import java.net.URI; +import java.util.EnumSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -131,25 +133,26 @@ public class HdfsAdmin { * Add a new CacheDirectiveInfo. * * @param info Information about a directive to add. + * @param flags {@link CacheFlag}s to use for this operation. * @return the ID of the directive that was created. * @throws IOException if the directive could not be added */ - public long addCacheDirective(CacheDirectiveInfo info) - throws IOException { - return dfs.addCacheDirective(info); + public long addCacheDirective(CacheDirectiveInfo info, + EnumSet flags) throws IOException { + return dfs.addCacheDirective(info, flags); } /** * Modify a CacheDirective. * - * @param info Information about the directive to modify. - * You must set the ID to indicate which CacheDirective you want - * to modify. + * @param info Information about the directive to modify. You must set the ID + * to indicate which CacheDirective you want to modify. + * @param flags {@link CacheFlag}s to use for this operation. * @throws IOException if the directive could not be modified */ - public void modifyCacheDirective(CacheDirectiveInfo info) - throws IOException { - dfs.modifyCacheDirective(info); + public void modifyCacheDirective(CacheDirectiveInfo info, + EnumSet flags) throws IOException { + dfs.modifyCacheDirective(info, flags); } /** Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java Fri Dec 20 01:01:18 2013 @@ -18,8 +18,6 @@ package org.apache.hadoop.hdfs.protocol; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import javax.annotation.Nullable; @@ -32,14 +30,6 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; -import org.apache.hadoop.hdfs.util.XMLUtils; -import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException; -import org.apache.hadoop.hdfs.util.XMLUtils.Stanza; -import org.apache.hadoop.io.Text; -import org.xml.sax.ContentHandler; -import org.xml.sax.SAXException; /** * CachePoolInfo describes a cache pool. @@ -64,7 +54,7 @@ public class CachePoolInfo { FsPermission mode; @Nullable - Integer weight; + Long limit; public CachePoolInfo(String poolName) { this.poolName = poolName; @@ -101,12 +91,12 @@ public class CachePoolInfo { return this; } - public Integer getWeight() { - return weight; + public Long getLimit() { + return limit; } - public CachePoolInfo setWeight(Integer weight) { - this.weight = weight; + public CachePoolInfo setLimit(Long bytes) { + this.limit = bytes; return this; } @@ -117,7 +107,7 @@ public class CachePoolInfo { append(", groupName:").append(groupName). append(", mode:").append((mode == null) ? "null" : String.format("0%03o", mode.toShort())). - append(", weight:").append(weight). + append(", limit:").append(limit). append("}").toString(); } @@ -134,7 +124,7 @@ public class CachePoolInfo { append(ownerName, other.ownerName). append(groupName, other.groupName). append(mode, other.mode). - append(weight, other.weight). + append(limit, other.limit). isEquals(); } @@ -145,7 +135,7 @@ public class CachePoolInfo { append(ownerName). append(groupName). append(mode). - append(weight). + append(limit). hashCode(); } @@ -153,8 +143,8 @@ public class CachePoolInfo { if (info == null) { throw new InvalidRequestException("CachePoolInfo is null"); } - if ((info.getWeight() != null) && (info.getWeight() < 0)) { - throw new InvalidRequestException("CachePool weight is negative."); + if ((info.getLimit() != null) && (info.getLimit() < 0)) { + throw new InvalidRequestException("Limit is negative."); } validateName(info.poolName); } @@ -167,66 +157,4 @@ public class CachePoolInfo { throw new IOException("invalid empty cache pool name"); } } - - public static CachePoolInfo readFrom(DataInput in) throws IOException { - String poolName = Text.readString(in); - CachePoolInfo info = new CachePoolInfo(poolName); - if (in.readBoolean()) { - info.setOwnerName(Text.readString(in)); - } - if (in.readBoolean()) { - info.setGroupName(Text.readString(in)); - } - if (in.readBoolean()) { - info.setMode(FsPermission.read(in)); - } - if (in.readBoolean()) { - info.setWeight(in.readInt()); - } - return info; - } - - public void writeTo(DataOutput out) throws IOException { - Text.writeString(out, poolName); - boolean hasOwner, hasGroup, hasMode, hasWeight; - hasOwner = ownerName != null; - hasGroup = groupName != null; - hasMode = mode != null; - hasWeight = weight != null; - out.writeBoolean(hasOwner); - if (hasOwner) { - Text.writeString(out, ownerName); - } - out.writeBoolean(hasGroup); - if (hasGroup) { - Text.writeString(out, groupName); - } - out.writeBoolean(hasMode); - if (hasMode) { - mode.write(out); - } - out.writeBoolean(hasWeight); - if (hasWeight) { - out.writeInt(weight); - } - } - - public void writeXmlTo(ContentHandler contentHandler) throws SAXException { - XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName); - PermissionStatus perm = new PermissionStatus(ownerName, - groupName, mode); - FSEditLogOp.permissionStatusToXml(contentHandler, perm); - XMLUtils.addSaxString(contentHandler, "WEIGHT", Integer.toString(weight)); - } - - public static CachePoolInfo readXmlFrom(Stanza st) throws InvalidXmlException { - String poolName = st.getValue("POOLNAME"); - PermissionStatus perm = FSEditLogOp.permissionStatusFromXml(st); - int weight = Integer.parseInt(st.getValue("WEIGHT")); - return new CachePoolInfo(poolName). - setOwnerName(perm.getUserName()). - setGroupName(perm.getGroupName()). - setMode(perm.getPermission()). - setWeight(weight); - } } Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java Fri Dec 20 01:01:18 2013 @@ -30,6 +30,7 @@ public class CachePoolStats { public static class Builder { private long bytesNeeded; private long bytesCached; + private long bytesOverlimit; private long filesNeeded; private long filesCached; @@ -46,6 +47,11 @@ public class CachePoolStats { return this; } + public Builder setBytesOverlimit(long bytesOverlimit) { + this.bytesOverlimit = bytesOverlimit; + return this; + } + public Builder setFilesNeeded(long filesNeeded) { this.filesNeeded = filesNeeded; return this; @@ -57,20 +63,22 @@ public class CachePoolStats { } public CachePoolStats build() { - return new CachePoolStats(bytesNeeded, bytesCached, filesNeeded, - filesCached); + return new CachePoolStats(bytesNeeded, bytesCached, bytesOverlimit, + filesNeeded, filesCached); } }; private final long bytesNeeded; private final long bytesCached; + private final long bytesOverlimit; private final long filesNeeded; private final long filesCached; - private CachePoolStats(long bytesNeeded, long bytesCached, long filesNeeded, - long filesCached) { + private CachePoolStats(long bytesNeeded, long bytesCached, + long bytesOverlimit, long filesNeeded, long filesCached) { this.bytesNeeded = bytesNeeded; this.bytesCached = bytesCached; + this.bytesOverlimit = bytesOverlimit; this.filesNeeded = filesNeeded; this.filesCached = filesCached; } @@ -83,6 +91,10 @@ public class CachePoolStats { return bytesCached; } + public long getBytesOverlimit() { + return bytesOverlimit; + } + public long getFilesNeeded() { return filesNeeded; } @@ -95,6 +107,7 @@ public class CachePoolStats { return new StringBuilder().append("{"). append("bytesNeeded:").append(bytesNeeded). append(", bytesCached:").append(bytesCached). + append(", bytesOverlimit:").append(bytesOverlimit). append(", filesNeeded:").append(filesNeeded). append(", filesCached:").append(filesCached). append("}").toString(); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Dec 20 01:01:18 2013 @@ -19,10 +19,12 @@ package org.apache.hadoop.hdfs.protocol; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -1103,23 +1105,24 @@ public interface ClientProtocol { * Add a CacheDirective to the CacheManager. * * @param directive A CacheDirectiveInfo to be added + * @param flags {@link CacheFlag}s to use for this operation. * @return A CacheDirectiveInfo associated with the added directive * @throws IOException if the directive could not be added */ @AtMostOnce - public long addCacheDirective( - CacheDirectiveInfo directive) throws IOException; + public long addCacheDirective(CacheDirectiveInfo directive, + EnumSet flags) throws IOException; /** * Modify a CacheDirective in the CacheManager. * - * @return directive The directive to modify. Must contain - * a directive ID. + * @return directive The directive to modify. Must contain a directive ID. + * @param flags {@link CacheFlag}s to use for this operation. * @throws IOException if the directive could not be modified */ @AtMostOnce - public void modifyCacheDirective( - CacheDirectiveInfo directive) throws IOException; + public void modifyCacheDirective(CacheDirectiveInfo directive, + EnumSet flags) throws IOException; /** * Remove a CacheDirectiveInfo from the CacheManager. Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Dec 20 01:01:18 2013 @@ -350,7 +350,7 @@ public class ClientNamenodeProtocolServe try { HdfsFileStatus result = server.create(req.getSrc(), PBHelper.convert(req.getMasked()), req.getClientName(), - PBHelper.convert(req.getCreateFlag()), req.getCreateParent(), + PBHelper.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(), (short) req.getReplication(), req.getBlockSize()); if (result != null) { @@ -1064,9 +1064,11 @@ public class ClientNamenodeProtocolServe RpcController controller, AddCacheDirectiveRequestProto request) throws ServiceException { try { + long id = server.addCacheDirective( + PBHelper.convert(request.getInfo()), + PBHelper.convertCacheFlags(request.getCacheFlags())); return AddCacheDirectiveResponseProto.newBuilder(). - setId(server.addCacheDirective( - PBHelper.convert(request.getInfo()))).build(); + setId(id).build(); } catch (IOException e) { throw new ServiceException(e); } @@ -1078,7 +1080,8 @@ public class ClientNamenodeProtocolServe throws ServiceException { try { server.modifyCacheDirective( - PBHelper.convert(request.getInfo())); + PBHelper.convert(request.getInfo()), + PBHelper.convertCacheFlags(request.getCacheFlags())); return ModifyCacheDirectiveResponseProto.newBuilder().build(); } catch (IOException e) { throw new ServiceException(e); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Fri Dec 20 01:01:18 2013 @@ -21,11 +21,13 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; +import java.util.EnumSet; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -1012,24 +1014,32 @@ public class ClientNamenodeProtocolTrans } @Override - public long addCacheDirective( - CacheDirectiveInfo directive) throws IOException { + public long addCacheDirective(CacheDirectiveInfo directive, + EnumSet flags) throws IOException { try { - return rpcProxy.addCacheDirective(null, - AddCacheDirectiveRequestProto.newBuilder(). - setInfo(PBHelper.convert(directive)).build()).getId(); + AddCacheDirectiveRequestProto.Builder builder = + AddCacheDirectiveRequestProto.newBuilder(). + setInfo(PBHelper.convert(directive)); + if (!flags.isEmpty()) { + builder.setCacheFlags(PBHelper.convertCacheFlags(flags)); + } + return rpcProxy.addCacheDirective(null, builder.build()).getId(); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } } @Override - public void modifyCacheDirective( - CacheDirectiveInfo directive) throws IOException { + public void modifyCacheDirective(CacheDirectiveInfo directive, + EnumSet flags) throws IOException { try { - rpcProxy.modifyCacheDirective(null, + ModifyCacheDirectiveRequestProto.Builder builder = ModifyCacheDirectiveRequestProto.newBuilder(). - setInfo(PBHelper.convert(directive)).build()); + setInfo(PBHelper.convert(directive)); + if (!flags.isEmpty()) { + builder.setCacheFlags(PBHelper.convertCacheFlags(flags)); + } + rpcProxy.modifyCacheDirective(null, builder.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Dec 20 01:01:18 2013 @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FsServerDefaults; @@ -75,6 +76,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheFlagProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto; @@ -1205,7 +1207,7 @@ public class PBHelper { return value; } - public static EnumSetWritable convert(int flag) { + public static EnumSetWritable convertCreateFlag(int flag) { EnumSet result = EnumSet.noneOf(CreateFlag.class); if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) { @@ -1220,7 +1222,23 @@ public class PBHelper { } return new EnumSetWritable(result); } - + + public static int convertCacheFlags(EnumSet flags) { + int value = 0; + if (flags.contains(CacheFlag.FORCE)) { + value |= CacheFlagProto.FORCE.getNumber(); + } + return value; + } + + public static EnumSet convertCacheFlags(int flags) { + EnumSet result = EnumSet.noneOf(CacheFlag.class); + if ((flags & CacheFlagProto.FORCE_VALUE) == CacheFlagProto.FORCE_VALUE) { + result.add(CacheFlag.FORCE); + } + return result; + } + public static HdfsFileStatus convert(HdfsFileStatusProto fs) { if (fs == null) return null; @@ -1818,8 +1836,8 @@ public class PBHelper { if (info.getMode() != null) { builder.setMode(info.getMode().toShort()); } - if (info.getWeight() != null) { - builder.setWeight(info.getWeight()); + if (info.getLimit() != null) { + builder.setLimit(info.getLimit()); } return builder.build(); } @@ -1837,8 +1855,8 @@ public class PBHelper { if (proto.hasMode()) { info.setMode(new FsPermission((short)proto.getMode())); } - if (proto.hasWeight()) { - info.setWeight(proto.getWeight()); + if (proto.hasLimit()) { + info.setLimit(proto.getLimit()); } return info; } @@ -1847,6 +1865,7 @@ public class PBHelper { CachePoolStatsProto.Builder builder = CachePoolStatsProto.newBuilder(); builder.setBytesNeeded(stats.getBytesNeeded()); builder.setBytesCached(stats.getBytesCached()); + builder.setBytesOverlimit(stats.getBytesOverlimit()); builder.setFilesNeeded(stats.getFilesNeeded()); builder.setFilesCached(stats.getFilesCached()); return builder.build(); @@ -1856,6 +1875,7 @@ public class PBHelper { CachePoolStats.Builder builder = new CachePoolStats.Builder(); builder.setBytesNeeded(proto.getBytesNeeded()); builder.setBytesCached(proto.getBytesCached()); + builder.setBytesOverlimit(proto.getBytesOverlimit()); builder.setFilesNeeded(proto.getFilesNeeded()); builder.setFilesCached(proto.getFilesCached()); return builder.build(); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Dec 20 01:01:18 2013 @@ -2893,6 +2893,7 @@ assert storedBlock.findDatanode(dn) < 0 */ boolean isReplicationInProgress(DatanodeDescriptor srcNode) { boolean status = false; + boolean firstReplicationLog = true; int underReplicatedBlocks = 0; int decommissionOnlyReplicas = 0; int underReplicatedInOpenFiles = 0; @@ -2907,10 +2908,17 @@ assert storedBlock.findDatanode(dn) < 0 int curExpectedReplicas = getReplication(block); if (isNeededReplication(block, curExpectedReplicas, curReplicas)) { if (curExpectedReplicas > curReplicas) { - //Log info about one block for this node which needs replication + // Log info about one block for this node which needs replication if (!status) { status = true; - logBlockReplicationInfo(block, srcNode, num); + if (firstReplicationLog) { + logBlockReplicationInfo(block, srcNode, num); + } + // Allowing decommission as long as default replication is met + if (curReplicas >= defaultReplication) { + status = false; + firstReplicationLog = false; + } } underReplicatedBlocks++; if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) { Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Fri Dec 20 01:01:18 2013 @@ -27,6 +27,9 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,6 +51,8 @@ import org.apache.hadoop.hdfs.util.ReadO import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.Time; +import com.google.common.base.Preconditions; + /** * Scans the namesystem, scheduling blocks to be cached as appropriate. * @@ -79,25 +84,52 @@ public class CacheReplicationMonitor ext private final long intervalMs; /** - * True if we should rescan immediately, regardless of how much time - * elapsed since the previous scan. + * The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and + * waiting for rescan operations. */ - private boolean rescanImmediately; + private final ReentrantLock lock = new ReentrantLock(); /** - * The monotonic time at which the current scan started. + * Notifies the scan thread that an immediate rescan is needed. */ - private long scanTimeMs; + private final Condition doRescan = lock.newCondition(); /** - * Mark status of the current scan. + * Notifies waiting threads that a rescan has finished. */ - private boolean mark = false; + private final Condition scanFinished = lock.newCondition(); /** - * True if this monitor should terminate. + * Whether there are pending CacheManager operations that necessitate a + * CacheReplicationMonitor rescan. Protected by the CRM lock. + */ + private boolean needsRescan = true; + + /** + * Whether we are currently doing a rescan. Protected by the CRM lock. + */ + private boolean isScanning = false; + + /** + * The number of rescans completed. Used to wait for scans to finish. + * Protected by the CacheReplicationMonitor lock. + */ + private long scanCount = 0; + + /** + * True if this monitor should terminate. Protected by the CRM lock. + */ + private boolean shutdown = false; + + /** + * The monotonic time at which the current scan started. */ - private boolean shutdown; + private long startTimeMs; + + /** + * Mark status of the current scan. + */ + private boolean mark = false; /** * Cache directives found in the previous scan. @@ -108,7 +140,7 @@ public class CacheReplicationMonitor ext * Blocks found in the previous scan. */ private long scannedBlocks; - + public CacheReplicationMonitor(FSNamesystem namesystem, CacheManager cacheManager, long intervalMs) { this.namesystem = namesystem; @@ -120,41 +152,60 @@ public class CacheReplicationMonitor ext @Override public void run() { - shutdown = false; - rescanImmediately = true; - scanTimeMs = 0; + startTimeMs = 0; LOG.info("Starting CacheReplicationMonitor with interval " + intervalMs + " milliseconds"); try { long curTimeMs = Time.monotonicNow(); while (true) { - synchronized(this) { + // Not all of the variables accessed here need the CRM lock, but take + // it anyway for simplicity + lock.lock(); + try { while (true) { if (shutdown) { LOG.info("Shutting down CacheReplicationMonitor"); return; } - if (rescanImmediately) { - LOG.info("Rescanning on request"); - rescanImmediately = false; + if (needsRescan) { + LOG.info("Rescanning because of pending operations"); break; } - long delta = (scanTimeMs + intervalMs) - curTimeMs; + long delta = (startTimeMs + intervalMs) - curTimeMs; if (delta <= 0) { - LOG.info("Rescanning after " + (curTimeMs - scanTimeMs) + + LOG.info("Rescanning after " + (curTimeMs - startTimeMs) + " milliseconds"); break; } - this.wait(delta); + doRescan.await(delta, TimeUnit.MILLISECONDS); curTimeMs = Time.monotonicNow(); } + } finally { + lock.unlock(); + } + // Mark scan as started, clear needsRescan + lock.lock(); + try { + isScanning = true; + needsRescan = false; + } finally { + lock.unlock(); } - scanTimeMs = curTimeMs; + startTimeMs = curTimeMs; mark = !mark; rescan(); curTimeMs = Time.monotonicNow(); + // Retake the CRM lock to update synchronization-related variables + lock.lock(); + try { + isScanning = false; + scanCount++; + scanFinished.signalAll(); + } finally { + lock.unlock(); + } LOG.info("Scanned " + scannedDirectives + " directive(s) and " + - scannedBlocks + " block(s) in " + (curTimeMs - scanTimeMs) + " " + + scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " + "millisecond(s)."); } } catch (Throwable t) { @@ -164,15 +215,91 @@ public class CacheReplicationMonitor ext } /** - * Kick the monitor thread. - * - * If it is sleeping, it will wake up and start scanning. - * If it is currently scanning, it will finish the scan and immediately do - * another one. - */ - public synchronized void kick() { - rescanImmediately = true; - this.notifyAll(); + * Similar to {@link CacheReplicationMonitor#waitForRescan()}, except it only + * waits if there are pending operations that necessitate a rescan as + * indicated by {@link #setNeedsRescan()}. + *

+ * Note that this call may release the FSN lock, so operations before and + * after are not necessarily atomic. + */ + public void waitForRescanIfNeeded() { + lock.lock(); + try { + if (!needsRescan) { + return; + } + } finally { + lock.unlock(); + } + waitForRescan(); + } + + /** + * Waits for a rescan to complete. This doesn't guarantee consistency with + * pending operations, only relative recency, since it will not force a new + * rescan if a rescan is already underway. + *

+ * Note that this call will release the FSN lock, so operations before and + * after are not atomic. + */ + public void waitForRescan() { + // Drop the FSN lock temporarily and retake it after we finish waiting + // Need to handle both the read lock and the write lock + boolean retakeWriteLock = false; + if (namesystem.hasWriteLock()) { + namesystem.writeUnlock(); + retakeWriteLock = true; + } else if (namesystem.hasReadLock()) { + namesystem.readUnlock(); + } else { + // Expected to have at least one of the locks + Preconditions.checkState(false, + "Need to be holding either the read or write lock"); + } + // try/finally for retaking FSN lock + try { + lock.lock(); + // try/finally for releasing CRM lock + try { + // If no scan is already ongoing, mark the CRM as dirty and kick + if (!isScanning) { + needsRescan = true; + doRescan.signal(); + } + // Wait until the scan finishes and the count advances + final long startCount = scanCount; + while (startCount >= scanCount) { + try { + scanFinished.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for CacheReplicationMonitor" + + " rescan", e); + break; + } + } + } finally { + lock.unlock(); + } + } finally { + if (retakeWriteLock) { + namesystem.writeLock(); + } else { + namesystem.readLock(); + } + } + } + + /** + * Indicates to the CacheReplicationMonitor that there have been CacheManager + * changes that require a rescan. + */ + public void setNeedsRescan() { + lock.lock(); + try { + this.needsRescan = true; + } finally { + lock.unlock(); + } } /** @@ -180,10 +307,14 @@ public class CacheReplicationMonitor ext */ @Override public void close() throws IOException { - synchronized(this) { + lock.lock(); + try { if (shutdown) return; shutdown = true; - this.notifyAll(); + doRescan.signalAll(); + scanFinished.signalAll(); + } finally { + lock.unlock(); } try { if (this.isAlive()) { @@ -228,12 +359,14 @@ public class CacheReplicationMonitor ext // Reset the directive's statistics directive.resetStatistics(); // Skip processing this entry if it has expired - LOG.info("Directive expiry is at " + directive.getExpiryTime()); + if (LOG.isTraceEnabled()) { + LOG.trace("Directive expiry is at " + directive.getExpiryTime()); + } if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) { if (LOG.isDebugEnabled()) { LOG.debug("Skipping directive id " + directive.getId() + " because it has expired (" + directive.getExpiryTime() + ">=" - + now); + + now + ")"); } continue; } @@ -280,15 +413,27 @@ public class CacheReplicationMonitor ext // Increment the "needed" statistics directive.addFilesNeeded(1); - long neededTotal = 0; - for (BlockInfo blockInfo : blockInfos) { - long neededByBlock = - directive.getReplication() * blockInfo.getNumBytes(); - neededTotal += neededByBlock; - } + // We don't cache UC blocks, don't add them to the total here + long neededTotal = file.computeFileSizeNotIncludingLastUcBlock() * + directive.getReplication(); directive.addBytesNeeded(neededTotal); - // TODO: Enforce per-pool quotas + // The pool's bytesNeeded is incremented as we scan. If the demand + // thus far plus the demand of this file would exceed the pool's limit, + // do not cache this file. + CachePool pool = directive.getPool(); + if (pool.getBytesNeeded() > pool.getLimit()) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Skipping directive id %d file %s because " + + "limit of pool %s would be exceeded (%d > %d)", + directive.getId(), + file.getFullPathName(), + pool.getPoolName(), + pool.getBytesNeeded(), + pool.getLimit())); + } + return; + } long cachedTotal = 0; for (BlockInfo blockInfo : blockInfos) { Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Dec 20 01:01:18 2013 @@ -117,6 +117,18 @@ public class JspHelper { return 0; } } + + /** + * convenience method for canonicalizing host name. + * @param addr name:port or name + * @return canonicalized host name + */ + public static String canonicalize(String addr) { + // default port 1 is supplied to allow addr without port. + // the port will be ignored. + return NetUtils.createSocketAddr(addr, 1).getAddress() + .getCanonicalHostName(); + } /** * A helper class that generates the correct URL for different schema. @@ -124,10 +136,11 @@ public class JspHelper { */ public static final class Url { public static String authority(String scheme, DatanodeID d) { + String fqdn = canonicalize(d.getIpAddr()); if (scheme.equals("http")) { - return d.getInfoAddr(); + return fqdn + ":" + d.getInfoPort(); } else if (scheme.equals("https")) { - return d.getInfoSecureAddr(); + return fqdn + ":" + d.getInfoSecurePort(); } else { throw new IllegalArgumentException("Unknown scheme:" + scheme); } Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Fri Dec 20 01:01:18 2013 @@ -21,10 +21,13 @@ import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; @@ -67,7 +70,29 @@ public class BlockMetadataHeader { return checksum; } - + /** + * Read the header without changing the position of the FileChannel. + * + * @param fc The FileChannel to read. + * @return the Metadata Header. + * @throws IOException on error. + */ + public static BlockMetadataHeader preadHeader(FileChannel fc) + throws IOException { + byte arr[] = new byte[2 + DataChecksum.HEADER_LEN]; + ByteBuffer buf = ByteBuffer.wrap(arr); + + while (buf.hasRemaining()) { + if (fc.read(buf, 0) <= 0) { + throw new EOFException("unexpected EOF while reading " + + "metadata file header"); + } + } + short version = (short)((arr[0] << 8) | (arr[1] & 0xff)); + DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2); + return new BlockMetadataHeader(version, dataChecksum); + } + /** * This reads all the fields till the beginning of checksum. * @param in Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java Fri Dec 20 01:01:18 2013 @@ -21,8 +21,8 @@ package org.apache.hadoop.hdfs.server.da * The caching strategy we should use for an HDFS read or write operation. */ public class CachingStrategy { - private Boolean dropBehind; // null = use server defaults - private Long readahead; // null = use server defaults + private final Boolean dropBehind; // null = use server defaults + private final Long readahead; // null = use server defaults public static CachingStrategy newDefaultStrategy() { return new CachingStrategy(null, null); @@ -32,8 +32,28 @@ public class CachingStrategy { return new CachingStrategy(true, null); } - public CachingStrategy duplicate() { - return new CachingStrategy(this.dropBehind, this.readahead); + public static class Builder { + private Boolean dropBehind; + private Long readahead; + + public Builder(CachingStrategy prev) { + this.dropBehind = prev.dropBehind; + this.readahead = prev.readahead; + } + + public Builder setDropBehind(Boolean dropBehind) { + this.dropBehind = dropBehind; + return this; + } + + public Builder setReadahead(Long readahead) { + this.readahead = readahead; + return this; + } + + public CachingStrategy build() { + return new CachingStrategy(dropBehind, readahead); + } } public CachingStrategy(Boolean dropBehind, Long readahead) { @@ -45,18 +65,10 @@ public class CachingStrategy { return dropBehind; } - public void setDropBehind(Boolean dropBehind) { - this.dropBehind = dropBehind; - } - public Long getReadahead() { return readahead; } - public void setReadahead(Long readahead) { - this.readahead = readahead; - } - public String toString() { return "CachingStrategy(dropBehind=" + dropBehind + ", readahead=" + readahead + ")"; Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Fri Dec 20 01:01:18 2013 @@ -78,18 +78,6 @@ public class DatanodeJspHelper { } /** - * Internal convenience method for canonicalizing host name. - * @param addr name:port or name - * @return canonicalized host name - */ - private static String canonicalize(String addr) { - // default port 1 is supplied to allow addr without port. - // the port will be ignored. - return NetUtils.createSocketAddr(addr, 1).getAddress() - .getCanonicalHostName(); - } - - /** * Get the default chunk size. * @param conf the configuration * @return the number of bytes to chunk in @@ -228,7 +216,7 @@ public class DatanodeJspHelper { } } out.print("
Go back to DFS home"); dfs.close(); } @@ -359,7 +347,7 @@ public class DatanodeJspHelper { // generate a table and dump the info out.println("\n"); - String nnCanonicalName = canonicalize(nnAddr); + String nnCanonicalName = JspHelper.canonicalize(nnAddr); for (LocatedBlock cur : blocks) { out.print(""); final String blockidstring = Long.toString(cur.getBlock().getBlockId()); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Fri Dec 20 01:01:18 2013 @@ -27,11 +27,12 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT; import java.io.DataInput; -import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -45,13 +46,16 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; +import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirective; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -68,7 +72,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; @@ -342,6 +346,67 @@ public final class CacheManager { } /** + * Throws an exception if the CachePool does not have enough capacity to + * cache the given path at the replication factor. + * + * @param pool CachePool where the path is being cached + * @param path Path that is being cached + * @param replication Replication factor of the path + * @throws InvalidRequestException if the pool does not have enough capacity + */ + private void checkLimit(CachePool pool, String path, + short replication) throws InvalidRequestException { + CacheDirectiveStats stats = computeNeeded(path, replication); + if (pool.getBytesNeeded() + (stats.getBytesNeeded() * replication) > pool + .getLimit()) { + throw new InvalidRequestException("Caching path " + path + " of size " + + stats.getBytesNeeded() / replication + " bytes at replication " + + replication + " would exceed pool " + pool.getPoolName() + + "'s remaining capacity of " + + (pool.getLimit() - pool.getBytesNeeded()) + " bytes."); + } + } + + /** + * Computes the needed number of bytes and files for a path. + * @return CacheDirectiveStats describing the needed stats for this path + */ + private CacheDirectiveStats computeNeeded(String path, short replication) { + FSDirectory fsDir = namesystem.getFSDirectory(); + INode node; + long requestedBytes = 0; + long requestedFiles = 0; + CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder(); + try { + node = fsDir.getINode(path); + } catch (UnresolvedLinkException e) { + // We don't cache through symlinks + return builder.build(); + } + if (node == null) { + return builder.build(); + } + if (node.isFile()) { + requestedFiles = 1; + INodeFile file = node.asFile(); + requestedBytes = file.computeFileSize(); + } else if (node.isDirectory()) { + INodeDirectory dir = node.asDirectory(); + ReadOnlyList children = dir.getChildrenList(null); + requestedFiles = children.size(); + for (INode child : children) { + if (child.isFile()) { + requestedBytes += child.asFile().computeFileSize(); + } + } + } + return new CacheDirectiveStats.Builder() + .setBytesNeeded(requestedBytes) + .setFilesCached(requestedFiles) + .build(); + } + + /** * Get a CacheDirective by ID, validating the ID and that the directive * exists. */ @@ -384,6 +449,15 @@ public final class CacheManager { directivesByPath.put(path, directives); } directives.add(directive); + // Fix up pool stats + CacheDirectiveStats stats = + computeNeeded(directive.getPath(), directive.getReplication()); + directive.addBytesNeeded(stats.getBytesNeeded()); + directive.addFilesNeeded(directive.getFilesNeeded()); + + if (monitor != null) { + monitor.setNeedsRescan(); + } } /** @@ -407,7 +481,7 @@ public final class CacheManager { } public CacheDirectiveInfo addDirective( - CacheDirectiveInfo info, FSPermissionChecker pc) + CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet flags) throws IOException { assert namesystem.hasWriteLock(); CacheDirective directive; @@ -418,6 +492,14 @@ public final class CacheManager { short replication = validateReplication(info, (short)1); long expiryTime = validateExpiryTime(info, CacheDirectiveInfo.Expiration.EXPIRY_NEVER); + // Do quota validation if required + if (!flags.contains(CacheFlag.FORCE)) { + // Can't kick and wait if caching is disabled + if (monitor != null) { + monitor.waitForRescan(); + } + checkLimit(pool, path, replication); + } // All validation passed // Add a new entry with the next available ID. long id = getNextDirectiveId(); @@ -428,14 +510,11 @@ public final class CacheManager { throw e; } LOG.info("addDirective of " + info + " successful."); - if (monitor != null) { - monitor.kick(); - } return directive.toInfo(); } public void modifyDirective(CacheDirectiveInfo info, - FSPermissionChecker pc) throws IOException { + FSPermissionChecker pc, EnumSet flags) throws IOException { assert namesystem.hasWriteLock(); String idString = (info.getId() == null) ? @@ -463,6 +542,13 @@ public final class CacheManager { if (info.getPool() != null) { pool = getCachePool(validatePoolName(info)); checkWritePermission(pc, pool); + if (!flags.contains(CacheFlag.FORCE)) { + // Can't kick and wait if caching is disabled + if (monitor != null) { + monitor.waitForRescan(); + } + checkLimit(pool, path, replication); + } } removeInternal(prevEntry); CacheDirective newEntry = @@ -489,9 +575,18 @@ public final class CacheManager { if (directives.size() == 0) { directivesByPath.remove(path); } + // Fix up the stats from removing the pool + final CachePool pool = directive.getPool(); + directive.addBytesNeeded(-directive.getBytesNeeded()); + directive.addFilesNeeded(-directive.getFilesNeeded()); + directivesById.remove(directive.getId()); - directive.getPool().getDirectiveList().remove(directive); + pool.getDirectiveList().remove(directive); assert directive.getPool() == null; + + if (monitor != null) { + monitor.setNeedsRescan(); + } } public void removeDirective(long id, FSPermissionChecker pc) @@ -505,9 +600,6 @@ public final class CacheManager { LOG.warn("removeDirective of " + id + " failed: ", e); throw e; } - if (monitor != null) { - monitor.kick(); - } LOG.info("removeDirective of " + id + " successful."); } @@ -527,6 +619,9 @@ public final class CacheManager { if (filter.getReplication() != null) { throw new IOException("Filtering by replication is unsupported."); } + if (monitor != null) { + monitor.waitForRescanIfNeeded(); + } ArrayList replies = new ArrayList(NUM_PRE_ALLOCATED_ENTRIES); int numReplies = 0; @@ -573,16 +668,22 @@ public final class CacheManager { public CachePoolInfo addCachePool(CachePoolInfo info) throws IOException { assert namesystem.hasWriteLock(); - CachePoolInfo.validate(info); - String poolName = info.getPoolName(); - CachePool pool = cachePools.get(poolName); - if (pool != null) { - throw new InvalidRequestException("Cache pool " + poolName - + " already exists."); - } - pool = CachePool.createFromInfoAndDefaults(info); - cachePools.put(pool.getPoolName(), pool); - LOG.info("Created new cache pool " + pool); + CachePool pool; + try { + CachePoolInfo.validate(info); + String poolName = info.getPoolName(); + pool = cachePools.get(poolName); + if (pool != null) { + throw new InvalidRequestException("Cache pool " + poolName + + " already exists."); + } + pool = CachePool.createFromInfoAndDefaults(info); + cachePools.put(pool.getPoolName(), pool); + } catch (IOException e) { + LOG.info("addCachePool of " + info + " failed: ", e); + throw e; + } + LOG.info("addCachePool of " + info + " successful."); return pool.getInfo(true); } @@ -597,42 +698,51 @@ public final class CacheManager { public void modifyCachePool(CachePoolInfo info) throws IOException { assert namesystem.hasWriteLock(); - CachePoolInfo.validate(info); - String poolName = info.getPoolName(); - CachePool pool = cachePools.get(poolName); - if (pool == null) { - throw new InvalidRequestException("Cache pool " + poolName - + " does not exist."); - } StringBuilder bld = new StringBuilder(); - String prefix = ""; - if (info.getOwnerName() != null) { - pool.setOwnerName(info.getOwnerName()); - bld.append(prefix). - append("set owner to ").append(info.getOwnerName()); - prefix = "; "; - } - if (info.getGroupName() != null) { - pool.setGroupName(info.getGroupName()); - bld.append(prefix). - append("set group to ").append(info.getGroupName()); - prefix = "; "; - } - if (info.getMode() != null) { - pool.setMode(info.getMode()); - bld.append(prefix).append("set mode to " + info.getMode()); - prefix = "; "; - } - if (info.getWeight() != null) { - pool.setWeight(info.getWeight()); - bld.append(prefix). - append("set weight to ").append(info.getWeight()); - prefix = "; "; - } - if (prefix.isEmpty()) { - bld.append("no changes."); + try { + CachePoolInfo.validate(info); + String poolName = info.getPoolName(); + CachePool pool = cachePools.get(poolName); + if (pool == null) { + throw new InvalidRequestException("Cache pool " + poolName + + " does not exist."); + } + String prefix = ""; + if (info.getOwnerName() != null) { + pool.setOwnerName(info.getOwnerName()); + bld.append(prefix). + append("set owner to ").append(info.getOwnerName()); + prefix = "; "; + } + if (info.getGroupName() != null) { + pool.setGroupName(info.getGroupName()); + bld.append(prefix). + append("set group to ").append(info.getGroupName()); + prefix = "; "; + } + if (info.getMode() != null) { + pool.setMode(info.getMode()); + bld.append(prefix).append("set mode to " + info.getMode()); + prefix = "; "; + } + if (info.getLimit() != null) { + pool.setLimit(info.getLimit()); + bld.append(prefix).append("set limit to " + info.getLimit()); + prefix = "; "; + // New limit changes stats, need to set needs refresh + if (monitor != null) { + monitor.setNeedsRescan(); + } + } + if (prefix.isEmpty()) { + bld.append("no changes."); + } + } catch (IOException e) { + LOG.info("modifyCachePool of " + info + " failed: ", e); + throw e; } - LOG.info("modified " + poolName + "; " + bld.toString()); + LOG.info("modifyCachePool of " + info.getPoolName() + " successful; " + + bld.toString()); } /** @@ -646,28 +756,37 @@ public final class CacheManager { public void removeCachePool(String poolName) throws IOException { assert namesystem.hasWriteLock(); - CachePoolInfo.validateName(poolName); - CachePool pool = cachePools.remove(poolName); - if (pool == null) { - throw new InvalidRequestException( - "Cannot remove non-existent cache pool " + poolName); - } - // Remove all directives in this pool. - Iterator iter = pool.getDirectiveList().iterator(); - while (iter.hasNext()) { - CacheDirective directive = iter.next(); - directivesByPath.remove(directive.getPath()); - directivesById.remove(directive.getId()); - iter.remove(); - } - if (monitor != null) { - monitor.kick(); + try { + CachePoolInfo.validateName(poolName); + CachePool pool = cachePools.remove(poolName); + if (pool == null) { + throw new InvalidRequestException( + "Cannot remove non-existent cache pool " + poolName); + } + // Remove all directives in this pool. + Iterator iter = pool.getDirectiveList().iterator(); + while (iter.hasNext()) { + CacheDirective directive = iter.next(); + directivesByPath.remove(directive.getPath()); + directivesById.remove(directive.getId()); + iter.remove(); + } + if (monitor != null) { + monitor.setNeedsRescan(); + } + } catch (IOException e) { + LOG.info("removeCachePool of " + poolName + " failed: ", e); + throw e; } + LOG.info("removeCachePool of " + poolName + " successful."); } public BatchedListEntries listCachePools(FSPermissionChecker pc, String prevKey) { assert namesystem.hasReadLock(); + if (monitor != null) { + monitor.waitForRescanIfNeeded(); + } final int NUM_PRE_ALLOCATED_ENTRIES = 16; ArrayList results = new ArrayList(NUM_PRE_ALLOCATED_ENTRIES); @@ -782,7 +901,7 @@ public final class CacheManager { * @param sdPath path of the storage directory * @throws IOException */ - public void saveState(DataOutput out, String sdPath) + public void saveState(DataOutputStream out, String sdPath) throws IOException { out.writeLong(nextDirectiveId); savePools(out, sdPath); @@ -805,7 +924,7 @@ public final class CacheManager { /** * Save cache pools to fsimage */ - private void savePools(DataOutput out, + private void savePools(DataOutputStream out, String sdPath) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.CACHE_POOLS, sdPath); @@ -814,7 +933,7 @@ public final class CacheManager { Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(cachePools.size()); for (CachePool pool: cachePools.values()) { - pool.getInfo(true).writeTo(out); + FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true)); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); @@ -823,7 +942,7 @@ public final class CacheManager { /* * Save cache entries to fsimage */ - private void saveDirectives(DataOutput out, String sdPath) + private void saveDirectives(DataOutputStream out, String sdPath) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.CACHE_ENTRIES, sdPath); @@ -832,11 +951,7 @@ public final class CacheManager { Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(directivesById.size()); for (CacheDirective directive : directivesById.values()) { - out.writeLong(directive.getId()); - Text.writeString(out, directive.getPath()); - out.writeShort(directive.getReplication()); - Text.writeString(out, directive.getPool().getPoolName()); - out.writeLong(directive.getExpiryTime()); + FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo()); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); @@ -854,7 +969,7 @@ public final class CacheManager { prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numberOfPools; i++) { - addCachePool(CachePoolInfo.readFrom(in)); + addCachePool(FSImageSerialization.readCachePoolInfo(in)); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); @@ -871,19 +986,17 @@ public final class CacheManager { prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numDirectives; i++) { - long directiveId = in.readLong(); - String path = Text.readString(in); - short replication = in.readShort(); - String poolName = Text.readString(in); - long expiryTime = in.readLong(); + CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in); // Get pool reference by looking it up in the map + final String poolName = info.getPool(); CachePool pool = cachePools.get(poolName); if (pool == null) { throw new IOException("Directive refers to pool " + poolName + ", which does not exist."); } CacheDirective directive = - new CacheDirective(directiveId, path, replication, expiryTime); + new CacheDirective(info.getId(), info.getPath().toUri().getPath(), + info.getReplication(), info.getExpiration().getAbsoluteMillis()); boolean addedDirective = pool.getDirectiveList().add(directive); assert addedDirective; if (directivesById.put(directive.getId(), directive) != null) { Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Fri Dec 20 01:01:18 2013 @@ -49,8 +49,8 @@ import com.google.common.base.Preconditi public final class CachePool { public static final Log LOG = LogFactory.getLog(CachePool.class); - public static final int DEFAULT_WEIGHT = 100; - + public static final long DEFAULT_LIMIT = Long.MAX_VALUE; + @Nonnull private final String poolName; @@ -71,7 +71,10 @@ public final class CachePool { @Nonnull private FsPermission mode; - private int weight; + /** + * Maximum number of bytes that can be cached in this pool. + */ + private long limit; private long bytesNeeded; private long bytesCached; @@ -118,10 +121,10 @@ public final class CachePool { } FsPermission mode = (info.getMode() == null) ? FsPermission.getCachePoolDefault() : info.getMode(); - Integer weight = (info.getWeight() == null) ? - DEFAULT_WEIGHT : info.getWeight(); + long limit = info.getLimit() == null ? + DEFAULT_LIMIT : info.getLimit(); return new CachePool(info.getPoolName(), - ownerName, groupName, mode, weight); + ownerName, groupName, mode, limit); } /** @@ -131,11 +134,11 @@ public final class CachePool { static CachePool createFromInfo(CachePoolInfo info) { return new CachePool(info.getPoolName(), info.getOwnerName(), info.getGroupName(), - info.getMode(), info.getWeight()); + info.getMode(), info.getLimit()); } CachePool(String poolName, String ownerName, String groupName, - FsPermission mode, int weight) { + FsPermission mode, long limit) { Preconditions.checkNotNull(poolName); Preconditions.checkNotNull(ownerName); Preconditions.checkNotNull(groupName); @@ -144,7 +147,7 @@ public final class CachePool { this.ownerName = ownerName; this.groupName = groupName; this.mode = new FsPermission(mode); - this.weight = weight; + this.limit = limit; } public String getPoolName() { @@ -177,16 +180,16 @@ public final class CachePool { this.mode = new FsPermission(mode); return this; } - - public int getWeight() { - return weight; + + public long getLimit() { + return limit; } - public CachePool setWeight(int weight) { - this.weight = weight; + public CachePool setLimit(long bytes) { + this.limit = bytes; return this; } - + /** * Get either full or partial information about this CachePool. * @@ -204,7 +207,7 @@ public final class CachePool { return info.setOwnerName(ownerName). setGroupName(groupName). setMode(new FsPermission(mode)). - setWeight(weight); + setLimit(limit); } /** @@ -241,6 +244,10 @@ public final class CachePool { return bytesCached; } + public long getBytesOverlimit() { + return Math.max(bytesNeeded-limit, 0); + } + public long getFilesNeeded() { return filesNeeded; } @@ -258,6 +265,7 @@ public final class CachePool { return new CachePoolStats.Builder(). setBytesNeeded(bytesNeeded). setBytesCached(bytesCached). + setBytesOverlimit(getBytesOverlimit()). setFilesNeeded(filesNeeded). setFilesCached(filesCached). build(); @@ -291,7 +299,7 @@ public final class CachePool { append(", ownerName:").append(ownerName). append(", groupName:").append(groupName). append(", mode:").append(mode). - append(", weight:").append(weight). + append(", limit:").append(limit). append(" }").toString(); } Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Dec 20 01:01:18 2013 @@ -160,10 +160,10 @@ public class FSEditLog implements LogsPu private long totalTimeTransactions; // total time for all transactions private NameNodeMetrics metrics; - private NNStorage storage; - private Configuration conf; + private final NNStorage storage; + private final Configuration conf; - private List editsDirs; + private final List editsDirs; private ThreadLocal cache = new ThreadLocal() { @@ -176,7 +176,7 @@ public class FSEditLog implements LogsPu /** * The edit directories that are shared between primary and secondary. */ - private List sharedEditsDirs; + private final List sharedEditsDirs; private static class TransactionId { public long txid; @@ -203,10 +203,6 @@ public class FSEditLog implements LogsPu * @param editsDirs List of journals to use */ FSEditLog(Configuration conf, NNStorage storage, List editsDirs) { - init(conf, storage, editsDirs); - } - - private void init(Configuration conf, NNStorage storage, List editsDirs) { isSyncRunning = false; this.conf = conf; this.storage = storage; Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Dec 20 01:01:18 2013 @@ -24,12 +24,14 @@ import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.EnumMap; +import java.util.EnumSet; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -650,7 +652,7 @@ public class FSEditLogLoader { ModifyCacheDirectiveInfoOp modifyOp = (ModifyCacheDirectiveInfoOp) op; fsNamesys.getCacheManager().modifyDirective( - modifyOp.directive, null); + modifyOp.directive, null, EnumSet.of(CacheFlag.FORCE)); if (toAddRetryCache) { fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId); }