Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E84010C10 for ; Mon, 2 Dec 2013 17:42:27 +0000 (UTC) Received: (qmail 23189 invoked by uid 500); 2 Dec 2013 17:42:19 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 23159 invoked by uid 500); 2 Dec 2013 17:42:18 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 23142 invoked by uid 99); 2 Dec 2013 17:42:17 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Dec 2013 17:42:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Dec 2013 17:42:13 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2930F2388A6E; Mon, 2 Dec 2013 17:41:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1547122 [2/5] - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ h... Date: Mon, 02 Dec 2013 17:41:48 -0000 To: hdfs-commits@hadoop.apache.org From: arp@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131202174153.2930F2388A6E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Mon Dec 2 17:41:44 2013 @@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -87,6 +88,7 @@ public class QuorumJournalManager implem private final AsyncLoggerSet loggers; private int outputBufferCapacity = 512 * 1024; + private final URLConnectionFactory connectionFactory; public QuorumJournalManager(Configuration conf, URI uri, NamespaceInfo nsInfo) throws IOException { @@ -102,6 +104,8 @@ public class QuorumJournalManager implem this.uri = uri; this.nsInfo = nsInfo; this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory)); + this.connectionFactory = URLConnectionFactory + .newDefaultURLConnectionFactory(conf); // Configure timeouts. this.startSegmentTimeoutMs = conf.getInt( @@ -475,8 +479,8 @@ public class QuorumJournalManager implem URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId()); EditLogInputStream elis = EditLogFileInputStream.fromUrl( - url, remoteLog.getStartTxId(), remoteLog.getEndTxId(), - remoteLog.isInProgress()); + connectionFactory, url, remoteLog.getStartTxId(), + remoteLog.getEndTxId(), remoteLog.isInProgress()); allStreams.add(elis); } } Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Mon Dec 2 17:41:44 2013 @@ -114,10 +114,10 @@ class QuorumOutputStream extends EditLog } @Override - public String generateHtmlReport() { + public String generateReport() { StringBuilder sb = new StringBuilder(); - sb.append("Writing segment beginning at txid " + segmentTxId + "
\n"); - loggers.appendHtmlReport(sb); + sb.append("Writing segment beginning at txid " + segmentTxId + ". \n"); + loggers.appendReport(sb); return sb.toString(); } Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java Mon Dec 2 17:41:44 2013 @@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFS import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import javax.servlet.ServletContext; @@ -69,8 +71,15 @@ public class JournalNodeHttpServer { bindAddr.getHostName())); int tmpInfoPort = bindAddr.getPort(); + URI httpEndpoint; + try { + httpEndpoint = new URI("http://" + NetUtils.getHostPortString(bindAddr)); + } catch (URISyntaxException e) { + throw new IOException(e); + } + httpServer = new HttpServer.Builder().setName("journal") - .setBindAddress(bindAddr.getHostName()).setPort(tmpInfoPort) + .addEndpoint(httpEndpoint) .setFindPort(tmpInfoPort == 0).setConf(conf).setACL( new AccessControlList(conf.get(DFS_ADMIN, " "))) .setSecurityEnabled(UserGroupInformation.isSecurityEnabled()) @@ -85,7 +94,7 @@ public class JournalNodeHttpServer { httpServer.start(); // The web-server port can be ephemeral... ensure we have the correct info - infoPort = httpServer.getPort(); + infoPort = httpServer.getConnectorAddress(0).getPort(); LOG.info("Journal Web-server up at: " + bindAddr + ":" + infoPort); } @@ -104,7 +113,7 @@ public class JournalNodeHttpServer { * Return the actual address bound to by the running server. */ public InetSocketAddress getAddress() { - InetSocketAddress addr = httpServer.getListenerAddress(); + InetSocketAddress addr = httpServer.getConnectorAddress(0); assert addr.getPort() != 0; return addr; } Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Mon Dec 2 17:41:44 2013 @@ -22,6 +22,7 @@ import static org.apache.hadoop.util.Exi import java.io.Closeable; import java.io.IOException; import java.util.Collection; +import java.util.Date; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -208,17 +209,27 @@ public class CacheReplicationMonitor ext /** * Scan all CacheDirectives. Use the information to figure out * what cache replication factor each block should have. - * - * @param mark Whether the current scan is setting or clearing the mark */ private void rescanCacheDirectives() { FSDirectory fsDir = namesystem.getFSDirectory(); - for (CacheDirective pce : cacheManager.getEntriesById().values()) { + final long now = new Date().getTime(); + for (CacheDirective directive : cacheManager.getEntriesById().values()) { + // Reset the directive + directive.clearBytesNeeded(); + directive.clearBytesCached(); + directive.clearFilesAffected(); + // Skip processing this entry if it has expired + LOG.info("Directive expiry is at " + directive.getExpiryTime()); + if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping directive id " + directive.getId() + + " because it has expired (" + directive.getExpiryTime() + ">=" + + now); + } + continue; + } scannedDirectives++; - pce.clearBytesNeeded(); - pce.clearBytesCached(); - pce.clearFilesAffected(); - String path = pce.getPath(); + String path = directive.getPath(); INode node; try { node = fsDir.getINode(path); @@ -235,11 +246,11 @@ public class CacheReplicationMonitor ext ReadOnlyList children = dir.getChildrenList(null); for (INode child : children) { if (child.isFile()) { - rescanFile(pce, child.asFile()); + rescanFile(directive, child.asFile()); } } } else if (node.isFile()) { - rescanFile(pce, node.asFile()); + rescanFile(directive, node.asFile()); } else { if (LOG.isDebugEnabled()) { LOG.debug("Ignoring non-directory, non-file inode " + node + @@ -301,7 +312,7 @@ public class CacheReplicationMonitor ext pce.addBytesNeeded(neededTotal); pce.addBytesCached(cachedTotal); if (LOG.isTraceEnabled()) { - LOG.debug("Directive " + pce.getEntryId() + " is caching " + + LOG.debug("Directive " + pce.getId() + " is caching " + file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal); } } Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Mon Dec 2 17:41:44 2013 @@ -42,6 +42,12 @@ public interface DatanodeStatistics { /** @return the percentage of the block pool used space over the total capacity. */ public float getPercentBlockPoolUsed(); + + /** @return the total cache capacity of all DataNodes */ + public long getCacheCapacity(); + + /** @return the total cache used by all DataNodes */ + public long getCacheUsed(); /** @return the xceiver count */ public int getXceiverCount(); Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Mon Dec 2 17:41:44 2013 @@ -149,6 +149,17 @@ class HeartbeatManager implements Datano public synchronized int getXceiverCount() { return stats.xceiverCount; } + + @Override + public synchronized long getCacheCapacity() { + return stats.cacheCapacity; + } + + @Override + public synchronized long getCacheUsed() { + return stats.cacheUsed; + } + @Override public synchronized long[] getStats() { @@ -309,6 +320,8 @@ class HeartbeatManager implements Datano private long capacityRemaining = 0L; private long blockPoolUsed = 0L; private int xceiverCount = 0; + private long cacheCapacity = 0L; + private long cacheUsed = 0L; private int expiredHeartbeats = 0; @@ -322,6 +335,8 @@ class HeartbeatManager implements Datano } else { capacityTotal += node.getDfsUsed(); } + cacheCapacity += node.getCacheCapacity(); + cacheUsed += node.getCacheUsed(); } private void subtract(final DatanodeDescriptor node) { @@ -334,6 +349,8 @@ class HeartbeatManager implements Datano } else { capacityTotal -= node.getDfsUsed(); } + cacheCapacity -= node.getCacheCapacity(); + cacheUsed -= node.getCacheUsed(); } /** Increment expired heartbeat counter. */ Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Dec 2 17:41:44 2013 @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.Atomi import javax.management.ObjectName; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -234,6 +235,7 @@ public class DataNode extends Configured private volatile boolean heartbeatsDisabledForTests = false; private DataStorage storage = null; private HttpServer infoServer = null; + private int infoPort; private int infoSecurePort; DataNodeMetrics metrics; private InetSocketAddress streamingAddr; @@ -354,27 +356,33 @@ public class DataNode extends Configured String infoHost = infoSocAddr.getHostName(); int tmpInfoPort = infoSocAddr.getPort(); HttpServer.Builder builder = new HttpServer.Builder().setName("datanode") - .setBindAddress(infoHost).setPort(tmpInfoPort) + .addEndpoint(URI.create("http://" + NetUtils.getHostPortString(infoSocAddr))) .setFindPort(tmpInfoPort == 0).setConf(conf) .setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))); - this.infoServer = (secureResources == null) ? builder.build() : - builder.setConnector(secureResources.getListener()).build(); LOG.info("Opened info server at " + infoHost + ":" + tmpInfoPort); if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) { - boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY, - DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT); InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get( DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0)); - Configuration sslConf = new HdfsConfiguration(false); - sslConf.addResource(conf.get(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY, - "ssl-server.xml")); - this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth); + builder.addEndpoint(URI.create("https://" + + NetUtils.getHostPortString(secInfoSocAddr))); + Configuration sslConf = new Configuration(false); + sslConf.setBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY, conf + .getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY, + DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT)); + sslConf.addResource(conf.get( + DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY, + DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT)); + DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf); + if(LOG.isDebugEnabled()) { LOG.debug("Datanode listening for SSL on " + secInfoSocAddr); } infoSecurePort = secInfoSocAddr.getPort(); } + + this.infoServer = (secureResources == null) ? builder.build() : + builder.setConnector(secureResources.getListener()).build(); this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class); this.infoServer.addInternalServlet(null, "/getFileChecksum/*", FileChecksumServlets.GetServlet.class); @@ -390,6 +398,7 @@ public class DataNode extends Configured WebHdfsFileSystem.PATH_PREFIX + "/*"); } this.infoServer.start(); + this.infoPort = infoServer.getConnectorAddress(0).getPort(); } private void startPlugins(Configuration conf) { @@ -712,7 +721,7 @@ public class DataNode extends Configured this.dnConf = new DNConf(conf); if (dnConf.maxLockedMemory > 0) { - if (!NativeIO.isAvailable()) { + if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) { throw new RuntimeException(String.format( "Cannot start datanode because the configured max locked memory" + " size (%s) is greater than zero and native code is not available.", @@ -2320,7 +2329,7 @@ public class DataNode extends Configured * @return the datanode's http port */ public int getInfoPort() { - return infoServer.getPort(); + return infoPort; } /** Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Mon Dec 2 17:41:44 2013 @@ -310,7 +310,16 @@ public class DataStorage extends Storage @Override protected void setFieldsFromProperties(Properties props, StorageDirectory sd) throws IOException { - setLayoutVersion(props, sd); + setFieldsFromProperties(props, sd, false, 0); + } + + private void setFieldsFromProperties(Properties props, StorageDirectory sd, + boolean overrideLayoutVersion, int toLayoutVersion) throws IOException { + if (overrideLayoutVersion) { + this.layoutVersion = toLayoutVersion; + } else { + setLayoutVersion(props, sd); + } setcTime(props, sd); setStorageType(props, sd); setClusterId(props, layoutVersion, sd); @@ -374,13 +383,20 @@ public class DataStorage extends Storage return true; } + /** Read VERSION file for rollback */ + void readProperties(StorageDirectory sd, int rollbackLayoutVersion) + throws IOException { + Properties props = readPropertiesFile(sd.getVersionFile()); + setFieldsFromProperties(props, sd, true, rollbackLayoutVersion); + } + /** * Analize which and whether a transition of the fs state is required * and perform it if necessary. * - * Rollback if previousLV >= LAYOUT_VERSION && prevCTime <= namenode.cTime - * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime - * Regular startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime + * Rollback if the rollback startup option was specified. + * Upgrade if this.LV > LAYOUT_VERSION + * Regular startup if this.LV = LAYOUT_VERSION * * @param datanode Datanode to which this storage belongs to * @param sd storage directory @@ -420,25 +436,28 @@ public class DataStorage extends Storage + nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID()); } - // regular start up - if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION - && this.cTime == nsInfo.getCTime()) + // After addition of the federation feature, ctime check is only + // meaningful at BlockPoolSliceStorage level. + + // regular start up. + if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION) return; // regular startup // do upgrade - if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION - || this.cTime < nsInfo.getCTime()) { + if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION) { doUpgrade(sd, nsInfo); // upgrade return; } - // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime - // must shutdown - throw new IOException("Datanode state: LV = " + this.getLayoutVersion() - + " CTime = " + this.getCTime() - + " is newer than the namespace state: LV = " - + nsInfo.getLayoutVersion() - + " CTime = " + nsInfo.getCTime()); + // layoutVersion < LAYOUT_VERSION. I.e. stored layout version is newer + // than the version supported by datanode. This should have been caught + // in readProperties(), even if rollback was not carried out or somehow + // failed. + throw new IOException("BUG: The stored LV = " + this.getLayoutVersion() + + " is newer than the supported LV = " + + HdfsConstants.LAYOUT_VERSION + + " or name node LV = " + + nsInfo.getLayoutVersion()); } /** @@ -464,8 +483,13 @@ public class DataStorage extends Storage * @throws IOException on error */ void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException { + // If the existing on-disk layout version supportes federation, simply + // update its layout version. if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) { - clusterID = nsInfo.getClusterID(); + // The VERSION file is already read in. Override the layoutVersion + // field and overwrite the file. + LOG.info("Updating layout version from " + layoutVersion + " to " + + nsInfo.getLayoutVersion() + " for storage " + sd.getRoot()); layoutVersion = nsInfo.getLayoutVersion(); writeProperties(sd); return; @@ -550,15 +574,32 @@ public class DataStorage extends Storage *
  • Remove removed.tmp
  • * * - * Do nothing, if previous directory does not exist. + * If previous directory does not exist and the current version supports + * federation, perform a simple rollback of layout version. This does not + * involve saving/restoration of actual data. */ void doRollback( StorageDirectory sd, NamespaceInfo nsInfo ) throws IOException { File prevDir = sd.getPreviousDir(); - // regular startup if previous dir does not exist - if (!prevDir.exists()) + // This is a regular startup or a post-federation rollback + if (!prevDir.exists()) { + // The current datanode version supports federation and the layout + // version from namenode matches what the datanode supports. An invalid + // rollback may happen if namenode didn't rollback and datanode is + // running a wrong version. But this will be detected in block pool + // level and the invalid VERSION content will be overwritten when + // the error is corrected and rollback is retried. + if (LayoutVersion.supports(Feature.FEDERATION, + HdfsConstants.LAYOUT_VERSION) && + HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion()) { + readProperties(sd, nsInfo.getLayoutVersion()); + writeProperties(sd); + LOG.info("Layout version rolled back to " + + nsInfo.getLayoutVersion() + " for storage " + sd.getRoot()); + } return; + } DataStorage prevInfo = new DataStorage(); prevInfo.readPreviousVersionProperties(sd); Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Mon Dec 2 17:41:44 2013 @@ -145,6 +145,8 @@ public class FsDatasetCache { */ private final HashMap mappableBlockMap = new HashMap(); + private final AtomicLong numBlocksCached = new AtomicLong(0); + private final FsDatasetImpl dataset; private final ThreadPoolExecutor uncachingExecutor; @@ -417,6 +419,7 @@ public class FsDatasetCache { LOG.debug("Successfully cached block " + key.id + " in " + key.bpid + ". We are now caching " + newUsedBytes + " bytes in total."); } + numBlocksCached.addAndGet(1); success = true; } finally { if (!success) { @@ -465,6 +468,7 @@ public class FsDatasetCache { } long newUsedBytes = usedBytesCount.release(value.mappableBlock.getLength()); + numBlocksCached.addAndGet(-1); if (LOG.isDebugEnabled()) { LOG.debug("Uncaching of block " + key.id + " in " + key.bpid + " completed. usedBytes = " + newUsedBytes); @@ -477,14 +481,14 @@ public class FsDatasetCache { /** * Get the approximate amount of cache space used. */ - public long getDnCacheUsed() { + public long getCacheUsed() { return usedBytesCount.get(); } /** * Get the maximum amount of bytes we can cache. This is a constant. */ - public long getDnCacheCapacity() { + public long getCacheCapacity() { return maxBytes; } @@ -496,4 +500,7 @@ public class FsDatasetCache { return numBlocksFailedToUncache.get(); } + public long getNumBlocksCached() { + return numBlocksCached.get(); + } } Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Dec 2 17:41:44 2013 @@ -341,12 +341,12 @@ class FsDatasetImpl implements FsDataset @Override // FSDatasetMBean public long getCacheUsed() { - return cacheManager.getDnCacheUsed(); + return cacheManager.getCacheUsed(); } @Override // FSDatasetMBean public long getCacheCapacity() { - return cacheManager.getDnCacheCapacity(); + return cacheManager.getCacheCapacity(); } @Override // FSDatasetMBean @@ -359,6 +359,11 @@ class FsDatasetImpl implements FsDataset return cacheManager.getNumBlocksFailedToUncache(); } + @Override // FSDatasetMBean + public long getNumBlocksCached() { + return cacheManager.getNumBlocksCached(); + } + /** * Find the block's on-disk length */ Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java Mon Dec 2 17:41:44 2013 @@ -89,6 +89,11 @@ public interface FSDatasetMBean { public long getCacheCapacity(); /** + * Returns the number of blocks cached. + */ + public long getNumBlocksCached(); + + /** * Returns the number of blocks that the datanode was unable to cache */ public long getNumBlocksFailedToCache(); Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Mon Dec 2 17:41:44 2013 @@ -17,12 +17,12 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT; @@ -43,17 +43,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; +import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.CacheDirective; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; -import org.apache.hadoop.hdfs.protocol.CacheDirective; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; @@ -99,24 +100,24 @@ public final class CacheManager { private final BlockManager blockManager; /** - * Cache entries, sorted by ID. + * Cache directives, sorted by ID. * * listCacheDirectives relies on the ordering of elements in this map * to track what has already been listed by the client. */ - private final TreeMap entriesById = + private final TreeMap directivesById = new TreeMap(); /** - * The entry ID to use for a new entry. Entry IDs always increase, and are + * The directive ID to use for a new directive. IDs always increase, and are * never reused. */ - private long nextEntryId; + private long nextDirectiveId; /** - * Cache entries, sorted by path + * Cache directives, sorted by path */ - private final TreeMap> entriesByPath = + private final TreeMap> directivesByPath = new TreeMap>(); /** @@ -177,7 +178,7 @@ public final class CacheManager { BlockManager blockManager) { this.namesystem = namesystem; this.blockManager = blockManager; - this.nextEntryId = 1; + this.nextDirectiveId = 1; this.maxListCachePoolsResponses = conf.getInt( DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT); @@ -239,7 +240,7 @@ public final class CacheManager { public TreeMap getEntriesById() { assert namesystem.hasReadLock(); - return entriesById; + return directivesById; } @VisibleForTesting @@ -248,12 +249,12 @@ public final class CacheManager { return cachedBlocks; } - private long getNextEntryId() throws IOException { + private long getNextDirectiveId() throws IOException { assert namesystem.hasWriteLock(); - if (nextEntryId >= Long.MAX_VALUE - 1) { + if (nextDirectiveId >= Long.MAX_VALUE - 1) { throw new IOException("No more available IDs."); } - return nextEntryId++; + return nextDirectiveId++; } // Helper getter / validation methods @@ -301,7 +302,35 @@ public final class CacheManager { } /** - * Get a CacheDirective by ID, validating the ID and that the entry + * Calculates the absolute expiry time of the directive from the + * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration + * into an absolute time based on the local clock. + * + * @param directive from which to get the expiry time + * @param defaultValue to use if Expiration is not set + * @return Absolute expiry time in milliseconds since Unix epoch + * @throws InvalidRequestException if the Expiration is invalid + */ + private static long validateExpiryTime(CacheDirectiveInfo directive, + long defaultValue) throws InvalidRequestException { + long expiryTime; + CacheDirectiveInfo.Expiration expiration = directive.getExpiration(); + if (expiration != null) { + if (expiration.getMillis() < 0) { + throw new InvalidRequestException("Cannot set a negative expiration: " + + expiration.getMillis()); + } + // Converts a relative duration into an absolute time based on the local + // clock + expiryTime = expiration.getAbsoluteMillis(); + } else { + expiryTime = defaultValue; + } + return expiryTime; + } + + /** + * Get a CacheDirective by ID, validating the ID and that the directive * exists. */ private CacheDirective getById(long id) throws InvalidRequestException { @@ -309,13 +338,13 @@ public final class CacheManager { if (id <= 0) { throw new InvalidRequestException("Invalid negative ID."); } - // Find the entry. - CacheDirective entry = entriesById.get(id); - if (entry == null) { + // Find the directive. + CacheDirective directive = directivesById.get(id); + if (directive == null) { throw new InvalidRequestException("No directive with ID " + id + " found."); } - return entry; + return directive; } /** @@ -332,122 +361,134 @@ public final class CacheManager { // RPC handlers - private void addInternal(CacheDirective entry) { - entriesById.put(entry.getEntryId(), entry); - String path = entry.getPath(); - List entryList = entriesByPath.get(path); - if (entryList == null) { - entryList = new ArrayList(1); - entriesByPath.put(path, entryList); + private void addInternal(CacheDirective directive, CachePool pool) { + boolean addedDirective = pool.getDirectiveList().add(directive); + assert addedDirective; + directivesById.put(directive.getId(), directive); + String path = directive.getPath(); + List directives = directivesByPath.get(path); + if (directives == null) { + directives = new ArrayList(1); + directivesByPath.put(path, directives); } - entryList.add(entry); + directives.add(directive); + } + + /** + * To be called only from the edit log loading code + */ + CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive) + throws InvalidRequestException { + long id = directive.getId(); + CacheDirective entry = + new CacheDirective( + directive.getId(), + directive.getPath().toUri().getPath(), + directive.getReplication(), + directive.getExpiration().getAbsoluteMillis()); + CachePool pool = cachePools.get(directive.getPool()); + addInternal(entry, pool); + if (nextDirectiveId <= id) { + nextDirectiveId = id + 1; + } + return entry.toInfo(); } public CacheDirectiveInfo addDirective( - CacheDirectiveInfo directive, FSPermissionChecker pc) + CacheDirectiveInfo info, FSPermissionChecker pc) throws IOException { assert namesystem.hasWriteLock(); - CacheDirective entry; + CacheDirective directive; try { - CachePool pool = getCachePool(validatePoolName(directive)); + CachePool pool = getCachePool(validatePoolName(info)); checkWritePermission(pc, pool); - String path = validatePath(directive); - short replication = validateReplication(directive, (short)1); - long id; - if (directive.getId() != null) { - // We are loading an entry from the edit log. - // Use the ID from the edit log. - id = directive.getId(); - if (id <= 0) { - throw new InvalidRequestException("can't add an ID " + - "of " + id + ": it is not positive."); - } - if (id >= Long.MAX_VALUE) { - throw new InvalidRequestException("can't add an ID " + - "of " + id + ": it is too big."); - } - if (nextEntryId <= id) { - nextEntryId = id + 1; - } - } else { - // Add a new entry with the next available ID. - id = getNextEntryId(); - } - entry = new CacheDirective(id, path, replication, pool); - addInternal(entry); + String path = validatePath(info); + short replication = validateReplication(info, (short)1); + long expiryTime = validateExpiryTime(info, + CacheDirectiveInfo.Expiration.EXPIRY_NEVER); + // All validation passed + // Add a new entry with the next available ID. + long id = getNextDirectiveId(); + directive = new CacheDirective(id, path, replication, expiryTime); + addInternal(directive, pool); } catch (IOException e) { - LOG.warn("addDirective of " + directive + " failed: ", e); + LOG.warn("addDirective of " + info + " failed: ", e); throw e; } - LOG.info("addDirective of " + directive + " successful."); + LOG.info("addDirective of " + info + " successful."); if (monitor != null) { monitor.kick(); } - return entry.toDirective(); + return directive.toInfo(); } - public void modifyDirective(CacheDirectiveInfo directive, + public void modifyDirective(CacheDirectiveInfo info, FSPermissionChecker pc) throws IOException { assert namesystem.hasWriteLock(); String idString = - (directive.getId() == null) ? - "(null)" : directive.getId().toString(); + (info.getId() == null) ? + "(null)" : info.getId().toString(); try { // Check for invalid IDs. - Long id = directive.getId(); + Long id = info.getId(); if (id == null) { throw new InvalidRequestException("Must supply an ID."); } CacheDirective prevEntry = getById(id); checkWritePermission(pc, prevEntry.getPool()); String path = prevEntry.getPath(); - if (directive.getPath() != null) { - path = validatePath(directive); + if (info.getPath() != null) { + path = validatePath(info); } + short replication = prevEntry.getReplication(); - if (directive.getReplication() != null) { - replication = validateReplication(directive, replication); - } + replication = validateReplication(info, replication); + + long expiryTime = prevEntry.getExpiryTime(); + expiryTime = validateExpiryTime(info, expiryTime); + CachePool pool = prevEntry.getPool(); - if (directive.getPool() != null) { - pool = getCachePool(validatePoolName(directive)); + if (info.getPool() != null) { + pool = getCachePool(validatePoolName(info)); checkWritePermission(pc, pool); } removeInternal(prevEntry); CacheDirective newEntry = - new CacheDirective(id, path, replication, pool); - addInternal(newEntry); + new CacheDirective(id, path, replication, expiryTime); + addInternal(newEntry, pool); } catch (IOException e) { LOG.warn("modifyDirective of " + idString + " failed: ", e); throw e; } LOG.info("modifyDirective of " + idString + " successfully applied " + - directive + "."); + info+ "."); } - public void removeInternal(CacheDirective existing) + public void removeInternal(CacheDirective directive) throws InvalidRequestException { assert namesystem.hasWriteLock(); - // Remove the corresponding entry in entriesByPath. - String path = existing.getPath(); - List entries = entriesByPath.get(path); - if (entries == null || !entries.remove(existing)) { + // Remove the corresponding entry in directivesByPath. + String path = directive.getPath(); + List directives = directivesByPath.get(path); + if (directives == null || !directives.remove(directive)) { throw new InvalidRequestException("Failed to locate entry " + - existing.getEntryId() + " by path " + existing.getPath()); + directive.getId() + " by path " + directive.getPath()); } - if (entries.size() == 0) { - entriesByPath.remove(path); + if (directives.size() == 0) { + directivesByPath.remove(path); } - entriesById.remove(existing.getEntryId()); + directivesById.remove(directive.getId()); + directive.getPool().getDirectiveList().remove(directive); + assert directive.getPool() == null; } public void removeDirective(long id, FSPermissionChecker pc) throws IOException { assert namesystem.hasWriteLock(); try { - CacheDirective existing = getById(id); - checkWritePermission(pc, existing.getPool()); - removeInternal(existing); + CacheDirective directive = getById(id); + checkWritePermission(pc, directive.getPool()); + removeInternal(directive); } catch (IOException e) { LOG.warn("removeDirective of " + id + " failed: ", e); throw e; @@ -478,13 +519,13 @@ public final class CacheManager { new ArrayList(NUM_PRE_ALLOCATED_ENTRIES); int numReplies = 0; SortedMap tailMap = - entriesById.tailMap(prevId + 1); + directivesById.tailMap(prevId + 1); for (Entry cur : tailMap.entrySet()) { if (numReplies >= maxListCacheDirectivesNumResponses) { return new BatchedListEntries(replies, true); } - CacheDirective curEntry = cur.getValue(); - CacheDirectiveInfo info = cur.getValue().toDirective(); + CacheDirective curDirective = cur.getValue(); + CacheDirectiveInfo info = cur.getValue().toInfo(); if (filter.getPool() != null && !info.getPool().equals(filter.getPool())) { continue; @@ -496,7 +537,7 @@ public final class CacheManager { boolean hasPermission = true; if (pc != null) { try { - pc.checkPermission(curEntry.getPool(), FsAction.READ); + pc.checkPermission(curDirective.getPool(), FsAction.READ); } catch (AccessControlException e) { hasPermission = false; } @@ -530,7 +571,7 @@ public final class CacheManager { pool = CachePool.createFromInfoAndDefaults(info); cachePools.put(pool.getPoolName(), pool); LOG.info("Created new cache pool " + pool); - return pool.getInfo(null); + return pool.getInfo(true); } /** @@ -599,39 +640,34 @@ public final class CacheManager { throw new InvalidRequestException( "Cannot remove non-existent cache pool " + poolName); } - - // Remove entries using this pool - // TODO: could optimize this somewhat to avoid the need to iterate - // over all entries in entriesById - Iterator> iter = - entriesById.entrySet().iterator(); + // Remove all directives in this pool. + Iterator iter = pool.getDirectiveList().iterator(); while (iter.hasNext()) { - Entry entry = iter.next(); - if (entry.getValue().getPool() == pool) { - entriesByPath.remove(entry.getValue().getPath()); - iter.remove(); - } + CacheDirective directive = iter.next(); + directivesByPath.remove(directive.getPath()); + directivesById.remove(directive.getId()); + iter.remove(); } if (monitor != null) { monitor.kick(); } } - public BatchedListEntries + public BatchedListEntries listCachePools(FSPermissionChecker pc, String prevKey) { assert namesystem.hasReadLock(); final int NUM_PRE_ALLOCATED_ENTRIES = 16; - ArrayList results = - new ArrayList(NUM_PRE_ALLOCATED_ENTRIES); + ArrayList results = + new ArrayList(NUM_PRE_ALLOCATED_ENTRIES); SortedMap tailMap = cachePools.tailMap(prevKey, false); int numListed = 0; for (Entry cur : tailMap.entrySet()) { if (numListed++ >= maxListCachePoolsResponses) { - return new BatchedListEntries(results, true); + return new BatchedListEntries(results, true); } - results.add(cur.getValue().getInfo(pc)); + results.add(cur.getValue().getEntry(pc)); } - return new BatchedListEntries(results, false); + return new BatchedListEntries(results, false); } public void setCachedLocations(LocatedBlock block) { @@ -693,13 +729,6 @@ public final class CacheManager { for (Iterator iter = blockIds.iterator(); iter.hasNext(); ) { Block block = new Block(iter.next()); BlockInfo blockInfo = blockManager.getStoredBlock(block); - if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) { - // The NameNode will eventually remove or update the out-of-date block. - // Until then, we pretend that it isn't cached. - LOG.warn("Genstamp in cache report disagrees with our genstamp for " + - block + ": expected genstamp " + blockInfo.getGenerationStamp()); - continue; - } if (!blockInfo.isComplete()) { LOG.warn("Ignoring block id " + block.getBlockId() + ", because " + "it is in not complete yet. It is in state " + @@ -743,9 +772,9 @@ public final class CacheManager { */ public void saveState(DataOutput out, String sdPath) throws IOException { - out.writeLong(nextEntryId); + out.writeLong(nextDirectiveId); savePools(out, sdPath); - saveEntries(out, sdPath); + saveDirectives(out, sdPath); } /** @@ -755,10 +784,10 @@ public final class CacheManager { * @throws IOException */ public void loadState(DataInput in) throws IOException { - nextEntryId = in.readLong(); - // pools need to be loaded first since entries point to their parent pool + nextDirectiveId = in.readLong(); + // pools need to be loaded first since directives point to their parent pool loadPools(in); - loadEntries(in); + loadDirectives(in); } /** @@ -773,7 +802,7 @@ public final class CacheManager { Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(cachePools.size()); for (CachePool pool: cachePools.values()) { - pool.getInfo(null).writeTo(out); + pool.getInfo(true).writeTo(out); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); @@ -782,19 +811,20 @@ public final class CacheManager { /* * Save cache entries to fsimage */ - private void saveEntries(DataOutput out, String sdPath) + private void saveDirectives(DataOutput out, String sdPath) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.CACHE_ENTRIES, sdPath); prog.beginStep(Phase.SAVING_CHECKPOINT, step); - prog.setTotal(Phase.SAVING_CHECKPOINT, step, entriesById.size()); + prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size()); Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); - out.writeInt(entriesById.size()); - for (CacheDirective entry: entriesById.values()) { - out.writeLong(entry.getEntryId()); - Text.writeString(out, entry.getPath()); - out.writeShort(entry.getReplication()); - Text.writeString(out, entry.getPool().getPoolName()); + out.writeInt(directivesById.size()); + for (CacheDirective directive : directivesById.values()) { + out.writeLong(directive.getId()); + Text.writeString(out, directive.getPath()); + out.writeShort(directive.getReplication()); + Text.writeString(out, directive.getPool().getPoolName()); + out.writeLong(directive.getExpiryTime()); counter.increment(); } prog.endStep(Phase.SAVING_CHECKPOINT, step); @@ -819,38 +849,42 @@ public final class CacheManager { } /** - * Load cache entries from the fsimage + * Load cache directives from the fsimage */ - private void loadEntries(DataInput in) throws IOException { + private void loadDirectives(DataInput in) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = new Step(StepType.CACHE_ENTRIES); prog.beginStep(Phase.LOADING_FSIMAGE, step); - int numberOfEntries = in.readInt(); - prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfEntries); + int numDirectives = in.readInt(); + prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); - for (int i = 0; i < numberOfEntries; i++) { - long entryId = in.readLong(); + for (int i = 0; i < numDirectives; i++) { + long directiveId = in.readLong(); String path = Text.readString(in); short replication = in.readShort(); String poolName = Text.readString(in); + long expiryTime = in.readLong(); // Get pool reference by looking it up in the map CachePool pool = cachePools.get(poolName); if (pool == null) { - throw new IOException("Entry refers to pool " + poolName + + throw new IOException("Directive refers to pool " + poolName + ", which does not exist."); } - CacheDirective entry = - new CacheDirective(entryId, path, replication, pool); - if (entriesById.put(entry.getEntryId(), entry) != null) { - throw new IOException("An entry with ID " + entry.getEntryId() + + CacheDirective directive = + new CacheDirective(directiveId, path, replication, expiryTime); + boolean addedDirective = pool.getDirectiveList().add(directive); + assert addedDirective; + if (directivesById.put(directive.getId(), directive) != null) { + throw new IOException("A directive with ID " + directive.getId() + " already exists"); } - List entries = entriesByPath.get(entry.getPath()); - if (entries == null) { - entries = new LinkedList(); - entriesByPath.put(entry.getPath(), entries); + List directives = + directivesByPath.get(directive.getPath()); + if (directives == null) { + directives = new LinkedList(); + directivesByPath.put(directive.getPath(), directives); } - entries.add(entry); + directives.add(directive); counter.increment(); } prog.endStep(Phase.LOADING_FSIMAGE, step); Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Mon Dec 2 17:41:44 2013 @@ -26,9 +26,13 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.CacheDirective; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolStats; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.IntrusiveCollection; import com.google.common.base.Preconditions; @@ -69,6 +73,22 @@ public final class CachePool { private int weight; + public final static class DirectiveList + extends IntrusiveCollection { + private CachePool cachePool; + + private DirectiveList(CachePool cachePool) { + this.cachePool = cachePool; + } + + public CachePool getCachePool() { + return cachePool; + } + } + + @Nonnull + private final DirectiveList directiveList = new DirectiveList(this); + /** * Create a new cache pool based on a CachePoolInfo object and the defaults. * We will fill in information that was not supplied according to the @@ -171,7 +191,7 @@ public final class CachePool { * @return * Cache pool information. */ - private CachePoolInfo getInfo(boolean fullInfo) { + CachePoolInfo getInfo(boolean fullInfo) { CachePoolInfo info = new CachePoolInfo(poolName); if (!fullInfo) { return info; @@ -183,15 +203,28 @@ public final class CachePool { } /** + * Get statistics about this CachePool. + * + * @return Cache pool statistics. + */ + private CachePoolStats getStats() { + return new CachePoolStats.Builder(). + setBytesNeeded(0). + setBytesCached(0). + setFilesAffected(0). + build(); + } + + /** * Returns a CachePoolInfo describing this CachePool based on the permissions * of the calling user. Unprivileged users will see only minimal descriptive * information about the pool. * * @param pc Permission checker to be used to validate the user's permissions, * or null - * @return CachePoolInfo describing this CachePool + * @return CachePoolEntry describing this CachePool */ - public CachePoolInfo getInfo(FSPermissionChecker pc) { + public CachePoolEntry getEntry(FSPermissionChecker pc) { boolean hasPermission = true; if (pc != null) { try { @@ -200,7 +233,8 @@ public final class CachePool { hasPermission = false; } } - return getInfo(hasPermission); + return new CachePoolEntry(getInfo(hasPermission), + hasPermission ? getStats() : new CachePoolStats.Builder().build()); } public String toString() { @@ -212,4 +246,8 @@ public final class CachePool { append(", weight:").append(weight). append(" }").toString(); } + + public DirectiveList getDirectiveList() { + return directiveList; + } } Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Mon Dec 2 17:41:44 2013 @@ -36,8 +36,11 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException; +import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticationException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -100,15 +103,22 @@ public class EditLogFileInputStream exte /** * Open an EditLogInputStream for the given URL. * - * @param url the url hosting the log - * @param startTxId the expected starting txid - * @param endTxId the expected ending txid - * @param inProgress whether the log is in-progress + * @param connectionFactory + * the URLConnectionFactory used to create the connection. + * @param url + * the url hosting the log + * @param startTxId + * the expected starting txid + * @param endTxId + * the expected ending txid + * @param inProgress + * whether the log is in-progress * @return a stream from which edits may be read */ - public static EditLogInputStream fromUrl(URL url, long startTxId, - long endTxId, boolean inProgress) { - return new EditLogFileInputStream(new URLLog(url), + public static EditLogInputStream fromUrl( + URLConnectionFactory connectionFactory, URL url, long startTxId, + long endTxId, boolean inProgress) { + return new EditLogFileInputStream(new URLLog(connectionFactory, url), startTxId, endTxId, inProgress); } @@ -365,8 +375,12 @@ public class EditLogFileInputStream exte private long advertisedSize = -1; private final static String CONTENT_LENGTH = "Content-Length"; + private final URLConnectionFactory connectionFactory; + private final boolean isSpnegoEnabled; - public URLLog(URL url) { + public URLLog(URLConnectionFactory connectionFactory, URL url) { + this.connectionFactory = connectionFactory; + this.isSpnegoEnabled = UserGroupInformation.isSecurityEnabled(); this.url = url; } @@ -376,8 +390,13 @@ public class EditLogFileInputStream exte new PrivilegedExceptionAction() { @Override public InputStream run() throws IOException { - HttpURLConnection connection = (HttpURLConnection) - SecurityUtil.openSecureHttpConnection(url); + HttpURLConnection connection; + try { + connection = (HttpURLConnection) + connectionFactory.openConnection(url, isSpnegoEnabled); + } catch (AuthenticationException e) { + throw new IOException(e); + } if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { throw new HttpGetFailedException( Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Mon Dec 2 17:41:44 2013 @@ -24,7 +24,6 @@ import static org.apache.hadoop.util.Tim import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.jasper.compiler.JspUtil; /** * A generic abstract class to support journaling of edits logs into @@ -141,10 +140,10 @@ public abstract class EditLogOutputStrea } /** - * @return a short HTML snippet suitable for describing the current + * @return a short text snippet suitable for describing the current * status of the stream */ - public String generateHtmlReport() { - return JspUtil.escapeXml(this.toString()); + public String generateReport() { + return toString(); } } Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Dec 2 17:41:44 2013 @@ -87,11 +87,15 @@ import com.google.common.base.Preconditi * *************************************************/ public class FSDirectory implements Closeable { - private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) { - final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota( + private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) { + final INodeDirectory r = new INodeDirectory( INodeId.ROOT_INODE_ID, INodeDirectory.ROOT_NAME, - namesystem.createFsOwnerPermissions(new FsPermission((short) 0755))); + namesystem.createFsOwnerPermissions(new FsPermission((short) 0755)), + 0L); + r.addDirectoryWithQuotaFeature( + DirectoryWithQuotaFeature.DEFAULT_NAMESPACE_QUOTA, + DirectoryWithQuotaFeature.DEFAULT_DISKSPACE_QUOTA); final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r); s.setSnapshotQuota(0); return s; @@ -107,7 +111,7 @@ public class FSDirectory implements Clos public final static String DOT_INODES_STRING = ".inodes"; public final static byte[] DOT_INODES = DFSUtil.string2Bytes(DOT_INODES_STRING); - INodeDirectoryWithQuota rootDir; + INodeDirectory rootDir; FSImage fsImage; private final FSNamesystem namesystem; private volatile boolean ready = false; @@ -202,7 +206,7 @@ public class FSDirectory implements Clos } /** @return the root directory inode. */ - public INodeDirectoryWithQuota getRoot() { + public INodeDirectory getRoot() { return rootDir; } @@ -452,8 +456,8 @@ public class FSDirectory implements Clos boolean unprotectedRemoveBlock(String path, INodeFile fileNode, Block block) throws IOException { - Preconditions.checkArgument(fileNode.isUnderConstruction()); // modify file-> block and blocksMap + // fileNode should be under construction boolean removed = fileNode.removeLastBlock(block); if (!removed) { return false; @@ -1800,9 +1804,8 @@ public class FSDirectory implements Clos final INode[] inodes = inodesInPath.getINodes(); for(int i=0; i < numOfINodes; i++) { if (inodes[i].isQuotaSet()) { // a directory with quota - INodeDirectoryWithQuota node = (INodeDirectoryWithQuota) inodes[i] - .asDirectory(); - node.addSpaceConsumed2Cache(nsDelta, dsDelta); + inodes[i].asDirectory().getDirectoryWithQuotaFeature() + .addSpaceConsumed2Cache(nsDelta, dsDelta); } } } @@ -2035,10 +2038,11 @@ public class FSDirectory implements Clos // Stop checking for quota when common ancestor is reached return; } - if (inodes[i].isQuotaSet()) { // a directory with quota + final DirectoryWithQuotaFeature q + = inodes[i].asDirectory().getDirectoryWithQuotaFeature(); + if (q != null) { // a directory with quota try { - ((INodeDirectoryWithQuota) inodes[i].asDirectory()).verifyQuota( - nsDelta, dsDelta); + q.verifyQuota(nsDelta, dsDelta); } catch (QuotaExceededException e) { e.setPathName(getFullPathName(inodes, i)); throw e; @@ -2385,35 +2389,14 @@ public class FSDirectory implements Clos if (dsQuota == HdfsConstants.QUOTA_DONT_SET) { dsQuota = oldDsQuota; } + if (oldNsQuota == nsQuota && oldDsQuota == dsQuota) { + return null; + } final Snapshot latest = iip.getLatestSnapshot(); - if (dirNode instanceof INodeDirectoryWithQuota) { - INodeDirectoryWithQuota quotaNode = (INodeDirectoryWithQuota) dirNode; - Quota.Counts counts = null; - if (!quotaNode.isQuotaSet()) { - // dirNode must be an INodeDirectoryWithSnapshot whose quota has not - // been set yet - counts = quotaNode.computeQuotaUsage(); - } - // a directory with quota; so set the quota to the new value - quotaNode.setQuota(nsQuota, dsQuota); - if (quotaNode.isQuotaSet() && counts != null) { - quotaNode.setSpaceConsumed(counts.get(Quota.NAMESPACE), - counts.get(Quota.DISKSPACE)); - } else if (!quotaNode.isQuotaSet() && latest == null) { - // do not replace the node if the node is a snapshottable directory - // without snapshots - if (!(quotaNode instanceof INodeDirectoryWithSnapshot)) { - // will not come here for root because root is snapshottable and - // root's nsQuota is always set - return quotaNode.replaceSelf4INodeDirectory(inodeMap); - } - } - } else { - // a non-quota directory; so replace it with a directory with quota - return dirNode.replaceSelf4Quota(latest, nsQuota, dsQuota, inodeMap); - } - return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null; + dirNode = dirNode.recordModification(latest, inodeMap); + dirNode.setQuota(nsQuota, dsQuota); + return dirNode; } } @@ -2442,7 +2425,8 @@ public class FSDirectory implements Clos long totalInodes() { readLock(); try { - return rootDir.numItemsInTree(); + return rootDir.getDirectoryWithQuotaFeature().getSpaceConsumed() + .get(Quota.NAMESPACE); } finally { readUnlock(); } Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Mon Dec 2 17:41:44 2013 @@ -953,7 +953,11 @@ public class FSEditLog implements LogsPu .setSnapshotRoot(path); logEdit(op); } - + + /** + * Log a CacheDirectiveInfo returned from + * {@link CacheManager#addDirective(CacheDirectiveInfo, FSPermissionChecker)} + */ void logAddCacheDirectiveInfo(CacheDirectiveInfo directive, boolean toLogRpcIds) { AddCacheDirectiveInfoOp op = Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Dec 2 17:41:44 2013 @@ -636,17 +636,17 @@ public class FSEditLogLoader { fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId); break; } - case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: { + case OP_ADD_CACHE_DIRECTIVE: { AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op; CacheDirectiveInfo result = fsNamesys. - getCacheManager().addDirective(addOp.directive, null); + getCacheManager().addDirectiveFromEditLog(addOp.directive); if (toAddRetryCache) { Long id = result.getId(); fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId, id); } break; } - case OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE: { + case OP_MODIFY_CACHE_DIRECTIVE: { ModifyCacheDirectiveInfoOp modifyOp = (ModifyCacheDirectiveInfoOp) op; fsNamesys.getCacheManager().modifyDirective( @@ -656,7 +656,7 @@ public class FSEditLogLoader { } break; } - case OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE: { + case OP_REMOVE_CACHE_DIRECTIVE: { RemoveCacheDirectiveInfoOp removeOp = (RemoveCacheDirectiveInfoOp) op; fsNamesys.getCacheManager().removeDirective(removeOp.id, null); Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Mon Dec 2 17:41:44 2013 @@ -18,9 +18,8 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL; -import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_PATH_BASED_CACHE_DIRECTIVE; -import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN; @@ -35,10 +34,11 @@ import static org.apache.hadoop.hdfs.ser import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_DIRECTIVE; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL; -import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT; @@ -64,6 +64,7 @@ import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; import java.util.EnumMap; import java.util.List; import java.util.zip.CheckedInputStream; @@ -81,12 +82,12 @@ import org.apache.hadoop.fs.permission.P import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DeprecatedUTF8; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.util.XMLUtils; import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException; @@ -109,7 +110,6 @@ import org.xml.sax.helpers.AttributesImp import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; /** * Helper classes for reading the ops from an InputStream. @@ -165,11 +165,11 @@ public abstract class FSEditLogOp { inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp()); inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op()); inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp()); - inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE, + inst.put(OP_ADD_CACHE_DIRECTIVE, new AddCacheDirectiveInfoOp()); - inst.put(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE, + inst.put(OP_MODIFY_CACHE_DIRECTIVE, new ModifyCacheDirectiveInfoOp()); - inst.put(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE, + inst.put(OP_REMOVE_CACHE_DIRECTIVE, new RemoveCacheDirectiveInfoOp()); inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp()); inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp()); @@ -2874,12 +2874,12 @@ public abstract class FSEditLogOp { CacheDirectiveInfo directive; public AddCacheDirectiveInfoOp() { - super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE); + super(OP_ADD_CACHE_DIRECTIVE); } static AddCacheDirectiveInfoOp getInstance(OpInstanceCache cache) { return (AddCacheDirectiveInfoOp) cache - .get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE); + .get(OP_ADD_CACHE_DIRECTIVE); } public AddCacheDirectiveInfoOp setDirective( @@ -2889,6 +2889,7 @@ public abstract class FSEditLogOp { assert(directive.getPath() != null); assert(directive.getReplication() != null); assert(directive.getPool() != null); + assert(directive.getExpiration() != null); return this; } @@ -2898,11 +2899,13 @@ public abstract class FSEditLogOp { String path = FSImageSerialization.readString(in); short replication = FSImageSerialization.readShort(in); String pool = FSImageSerialization.readString(in); + long expiryTime = FSImageSerialization.readLong(in); directive = new CacheDirectiveInfo.Builder(). setId(id). setPath(new Path(path)). setReplication(replication). setPool(pool). + setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiryTime)). build(); readRpcIds(in, logVersion); } @@ -2913,6 +2916,8 @@ public abstract class FSEditLogOp { FSImageSerialization.writeString(directive.getPath().toUri().getPath(), out); FSImageSerialization.writeShort(directive.getReplication(), out); FSImageSerialization.writeString(directive.getPool(), out); + FSImageSerialization.writeLong( + directive.getExpiration().getMillis(), out); writeRpcIds(rpcClientId, rpcCallId, out); } @@ -2925,6 +2930,8 @@ public abstract class FSEditLogOp { XMLUtils.addSaxString(contentHandler, "REPLICATION", Short.toString(directive.getReplication())); XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool()); + XMLUtils.addSaxString(contentHandler, "EXPIRATION", + "" + directive.getExpiration().getMillis()); appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @@ -2935,6 +2942,8 @@ public abstract class FSEditLogOp { setPath(new Path(st.getValue("PATH"))). setReplication(Short.parseShort(st.getValue("REPLICATION"))). setPool(st.getValue("POOL")). + setExpiration(CacheDirectiveInfo.Expiration.newAbsolute( + Long.parseLong(st.getValue("EXPIRATION")))). build(); readRpcIdsFromXml(st); } @@ -2946,7 +2955,8 @@ public abstract class FSEditLogOp { builder.append("id=" + directive.getId() + ","); builder.append("path=" + directive.getPath().toUri().getPath() + ","); builder.append("replication=" + directive.getReplication() + ","); - builder.append("pool=" + directive.getPool()); + builder.append("pool=" + directive.getPool() + ","); + builder.append("expiration=" + directive.getExpiration().getMillis()); appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append("]"); return builder.toString(); @@ -2961,12 +2971,12 @@ public abstract class FSEditLogOp { CacheDirectiveInfo directive; public ModifyCacheDirectiveInfoOp() { - super(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE); + super(OP_MODIFY_CACHE_DIRECTIVE); } static ModifyCacheDirectiveInfoOp getInstance(OpInstanceCache cache) { return (ModifyCacheDirectiveInfoOp) cache - .get(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE); + .get(OP_MODIFY_CACHE_DIRECTIVE); } public ModifyCacheDirectiveInfoOp setDirective( @@ -2991,7 +3001,12 @@ public abstract class FSEditLogOp { if ((flags & 0x4) != 0) { builder.setPool(FSImageSerialization.readString(in)); } - if ((flags & ~0x7) != 0) { + if ((flags & 0x8) != 0) { + builder.setExpiration( + CacheDirectiveInfo.Expiration.newAbsolute( + FSImageSerialization.readLong(in))); + } + if ((flags & ~0xF) != 0) { throw new IOException("unknown flags set in " + "ModifyCacheDirectiveInfoOp: " + flags); } @@ -3005,7 +3020,8 @@ public abstract class FSEditLogOp { byte flags = (byte)( ((directive.getPath() != null) ? 0x1 : 0) | ((directive.getReplication() != null) ? 0x2 : 0) | - ((directive.getPool() != null) ? 0x4 : 0) + ((directive.getPool() != null) ? 0x4 : 0) | + ((directive.getExpiration() != null) ? 0x8 : 0) ); out.writeByte(flags); if (directive.getPath() != null) { @@ -3018,6 +3034,10 @@ public abstract class FSEditLogOp { if (directive.getPool() != null) { FSImageSerialization.writeString(directive.getPool(), out); } + if (directive.getExpiration() != null) { + FSImageSerialization.writeLong(directive.getExpiration().getMillis(), + out); + } writeRpcIds(rpcClientId, rpcCallId, out); } @@ -3036,6 +3056,10 @@ public abstract class FSEditLogOp { if (directive.getPool() != null) { XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool()); } + if (directive.getExpiration() != null) { + XMLUtils.addSaxString(contentHandler, "EXPIRATION", + "" + directive.getExpiration().getMillis()); + } appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @@ -3056,6 +3080,11 @@ public abstract class FSEditLogOp { if (pool != null) { builder.setPool(pool); } + String expiryTime = st.getValueOrNull("EXPIRATION"); + if (expiryTime != null) { + builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute( + Long.parseLong(expiryTime))); + } this.directive = builder.build(); readRpcIdsFromXml(st); } @@ -3075,6 +3104,10 @@ public abstract class FSEditLogOp { if (directive.getPool() != null) { builder.append(",").append("pool=").append(directive.getPool()); } + if (directive.getExpiration() != null) { + builder.append(",").append("expiration="). + append(directive.getExpiration().getMillis()); + } appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append("]"); return builder.toString(); @@ -3089,12 +3122,12 @@ public abstract class FSEditLogOp { long id; public RemoveCacheDirectiveInfoOp() { - super(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE); + super(OP_REMOVE_CACHE_DIRECTIVE); } static RemoveCacheDirectiveInfoOp getInstance(OpInstanceCache cache) { return (RemoveCacheDirectiveInfoOp) cache - .get(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE); + .get(OP_REMOVE_CACHE_DIRECTIVE); } public RemoveCacheDirectiveInfoOp setId(long id) { @@ -3162,7 +3195,7 @@ public abstract class FSEditLogOp { @Override public void writeFields(DataOutputStream out) throws IOException { - info .writeTo(out); + info.writeTo(out); writeRpcIds(rpcClientId, rpcCallId, out); } Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Mon Dec 2 17:41:44 2013 @@ -64,12 +64,12 @@ public enum FSEditLogOpCodes { OP_DISALLOW_SNAPSHOT ((byte) 30), OP_SET_GENSTAMP_V2 ((byte) 31), OP_ALLOCATE_BLOCK_ID ((byte) 32), - OP_ADD_PATH_BASED_CACHE_DIRECTIVE ((byte) 33), - OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE ((byte) 34), + OP_ADD_CACHE_DIRECTIVE ((byte) 33), + OP_REMOVE_CACHE_DIRECTIVE ((byte) 34), OP_ADD_CACHE_POOL ((byte) 35), OP_MODIFY_CACHE_POOL ((byte) 36), OP_REMOVE_CACHE_POOL ((byte) 37), - OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE ((byte) 38); + OP_MODIFY_CACHE_DIRECTIVE ((byte) 38); private byte opCode; Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1547122&r1=1547121&r2=1547122&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Mon Dec 2 17:41:44 2013 @@ -755,7 +755,7 @@ public class FSImage implements Closeabl * This is an update of existing state of the filesystem and does not * throw QuotaExceededException. */ - static void updateCountForQuota(INodeDirectoryWithQuota root) { + static void updateCountForQuota(INodeDirectory root) { updateCountForQuotaRecursively(root, Quota.Counts.newInstance()); } @@ -795,7 +795,7 @@ public class FSImage implements Closeabl + " quota = " + dsQuota + " < consumed = " + diskspace); } - ((INodeDirectoryWithQuota)dir).setSpaceConsumed(namespace, diskspace); + dir.getDirectoryWithQuotaFeature().setSpaceConsumed(namespace, diskspace); } }