Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B7389200B81 for ; Mon, 29 Aug 2016 17:32:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B5C3C160AD9; Mon, 29 Aug 2016 15:32:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8428D160AD1 for ; Mon, 29 Aug 2016 17:32:54 +0200 (CEST) Received: (qmail 80575 invoked by uid 500); 29 Aug 2016 15:32:52 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 79900 invoked by uid 99); 29 Aug 2016 15:32:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Aug 2016 15:32:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DA1C4E3889; Mon, 29 Aug 2016 15:32:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Mon, 29 Aug 2016 15:33:01 -0000 Message-Id: In-Reply-To: <8d3cfb4f37fe46bc8fe62359e437de8c@git.apache.org> References: <8d3cfb4f37fe46bc8fe62359e437de8c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/52] [partial] hbase-site git commit: Published site at 950d547dae684155020edb879a343bee1bf18e97. archived-at: Mon, 29 Aug 2016 15:32:56 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/45a663dd/devapidocs/src-html/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.RefreshCacheTask.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.RefreshCacheTask.html b/devapidocs/src-html/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.RefreshCacheTask.html index ac4f63e..9ec3893 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.RefreshCacheTask.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.RefreshCacheTask.html @@ -35,358 +35,366 @@ 027import java.util.Set; 028import java.util.Timer; 029import java.util.TimerTask; -030 -031import com.google.common.annotations.VisibleForTesting; -032import com.google.common.collect.Lists; -033import org.apache.commons.logging.Log; -034import org.apache.commons.logging.LogFactory; -035import org.apache.hadoop.hbase.classification.InterfaceAudience; -036import org.apache.hadoop.hbase.classification.InterfaceStability; -037import org.apache.hadoop.conf.Configuration; -038import org.apache.hadoop.fs.FileStatus; -039import org.apache.hadoop.fs.FileSystem; -040import org.apache.hadoop.fs.Path; -041import org.apache.hadoop.hbase.Stoppable; -042import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; -043import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; -044import org.apache.hadoop.hbase.util.FSUtils; -045 -046/** -047 * Intelligently keep track of all the files for all the snapshots. -048 * <p> -049 * A cache of files is kept to avoid querying the {@link FileSystem} frequently. If there is a cache -050 * miss the directory modification time is used to ensure that we don't rescan directories that we -051 * already have in cache. We only check the modification times of the snapshot directories -052 * (/hbase/.snapshot/[snapshot_name]) to determine if the files need to be loaded into the cache. -053 * <p> -054 * New snapshots will be added to the cache and deleted snapshots will be removed when we refresh -055 * the cache. If the files underneath a snapshot directory are changed, but not the snapshot itself, -056 * we will ignore updates to that snapshot's files. -057 * <p> -058 * This is sufficient because each snapshot has its own directory and is added via an atomic rename -059 * <i>once</i>, when the snapshot is created. We don't need to worry about the data in the snapshot -060 * being run. -061 * <p> -062 * Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are -063 * also removed from the cache. -064 * <p> -065 * A {@link SnapshotFileCache.SnapshotFileInspector} must be passed when creating <tt>this</tt> to -066 * allow extraction of files under /hbase/.snapshot/[snapshot name] directory, for each snapshot. -067 * This allows you to only cache files under, for instance, all the logs in the .logs directory or -068 * all the files under all the regions. -069 * <p> -070 * <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid -071 * snapshots and will attempt to cache files from those snapshots as well. -072 * <p> -073 * Queries about a given file are thread-safe with respect to multiple queries and cache refreshes. -074 */ -075@InterfaceAudience.Private -076@InterfaceStability.Evolving -077public class SnapshotFileCache implements Stoppable { -078 interface SnapshotFileInspector { -079 /** -080 * Returns a collection of file names needed by the snapshot. -081 * @param snapshotDir {@link Path} to the snapshot directory to scan. -082 * @return the collection of file names needed by the snapshot. -083 */ -084 Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException; -085 } -086 -087 private static final Log LOG = LogFactory.getLog(SnapshotFileCache.class); -088 private volatile boolean stop = false; -089 private final FileSystem fs; -090 private final SnapshotFileInspector fileInspector; -091 private final Path snapshotDir; -092 private final Set<String> cache = new HashSet<String>(); -093 /** -094 * This is a helper map of information about the snapshot directories so we don't need to rescan -095 * them if they haven't changed since the last time we looked. -096 */ -097 private final Map<String, SnapshotDirectoryInfo> snapshots = -098 new HashMap<String, SnapshotDirectoryInfo>(); -099 private final Timer refreshTimer; -100 -101 private long lastModifiedTime = Long.MIN_VALUE; -102 -103 /** -104 * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the -105 * filesystem. -106 * <p> -107 * Immediately loads the file cache. -108 * @param conf to extract the configured {@link FileSystem} where the snapshots are stored and -109 * hbase root directory -110 * @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed -111 * @param refreshThreadName name of the cache refresh thread -112 * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files. -113 * @throws IOException if the {@link FileSystem} or root directory cannot be loaded -114 */ -115 public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName, -116 SnapshotFileInspector inspectSnapshotFiles) throws IOException { -117 this(FSUtils.getCurrentFileSystem(conf), FSUtils.getRootDir(conf), 0, cacheRefreshPeriod, -118 refreshThreadName, inspectSnapshotFiles); -119 } -120 -121 /** -122 * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the -123 * filesystem -124 * @param fs {@link FileSystem} where the snapshots are stored -125 * @param rootDir hbase root directory -126 * @param cacheRefreshPeriod period (ms) with which the cache should be refreshed -127 * @param cacheRefreshDelay amount of time to wait for the cache to be refreshed -128 * @param refreshThreadName name of the cache refresh thread -129 * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files. -130 */ -131 public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod, -132 long cacheRefreshDelay, String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) { -133 this.fs = fs; -134 this.fileInspector = inspectSnapshotFiles; -135 this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir); -136 // periodically refresh the file cache to make sure we aren't superfluously saving files. -137 this.refreshTimer = new Timer(refreshThreadName, true); -138 this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay, -139 cacheRefreshPeriod); -140 } -141 -142 /** -143 * Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending -144 * cache refreshes. -145 * <p> -146 * Blocks until the cache is refreshed. -147 * <p> -148 * Exposed for TESTING. -149 */ -150 public void triggerCacheRefreshForTesting() { -151 try { -152 SnapshotFileCache.this.refreshCache(); -153 } catch (IOException e) { -154 LOG.warn("Failed to refresh snapshot hfile cache!", e); -155 } -156 LOG.debug("Current cache:" + cache); -157 } -158 -159 /** -160 * Check to see if any of the passed file names is contained in any of the snapshots. -161 * First checks an in-memory cache of the files to keep. If its not in the cache, then the cache -162 * is refreshed and the cache checked again for that file. -163 * This ensures that we never return files that exist. -164 * <p> -165 * Note this may lead to periodic false positives for the file being referenced. Periodically, the -166 * cache is refreshed even if there are no requests to ensure that the false negatives get removed -167 * eventually. For instance, suppose you have a file in the snapshot and it gets loaded into the -168 * cache. Then at some point later that snapshot is deleted. If the cache has not been refreshed -169 * at that point, cache will still think the file system contains that file and return -170 * <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was -171 * on the filesystem, we will never find it and always return <tt>false</tt>. -172 * @param files file to check, NOTE: Relies that files are loaded from hdfs before method -173 * is called (NOT LAZY) -174 * @return <tt>unReferencedFiles</tt> the collection of files that do not have snapshot references -175 * @throws IOException if there is an unexpected error reaching the filesystem. -176 */ -177 // XXX this is inefficient to synchronize on the method, when what we really need to guard against -178 // is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the -179 // cache, but that seems overkill at the moment and isn't necessarily a bottleneck. -180 public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files) -181 throws IOException { -182 List<FileStatus> unReferencedFiles = Lists.newArrayList(); -183 List<String> snapshotsInProgress = null; -184 boolean refreshed = false; -185 for (FileStatus file : files) { -186 String fileName = file.getPath().getName(); -187 if (!refreshed && !cache.contains(fileName)) { -188 refreshCache(); -189 refreshed = true; -190 } -191 if (cache.contains(fileName)) { -192 continue; -193 } -194 if (snapshotsInProgress == null) { -195 snapshotsInProgress = getSnapshotsInProgress(); -196 } -197 if (snapshotsInProgress.contains(fileName)) { -198 continue; -199 } -200 unReferencedFiles.add(file); -201 } -202 return unReferencedFiles; -203 } -204 -205 private synchronized void refreshCache() throws IOException { -206 long lastTimestamp = Long.MAX_VALUE; -207 boolean hasChanges = false; -208 -209 // get the status of the snapshots directory and check if it is has changes -210 try { -211 FileStatus dirStatus = fs.getFileStatus(snapshotDir); -212 lastTimestamp = dirStatus.getModificationTime(); -213 hasChanges |= (lastTimestamp >= lastModifiedTime); -214 } catch (FileNotFoundException e) { -215 if (this.cache.size() > 0) { -216 LOG.error("Snapshot directory: " + snapshotDir + " doesn't exist"); -217 } -218 return; -219 } -220 -221 // get the status of the snapshots temporary directory and check if it has changes -222 // The top-level directory timestamp is not updated, so we have to check the inner-level. -223 try { -224 Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME); -225 FileStatus tempDirStatus = fs.getFileStatus(snapshotTmpDir); -226 lastTimestamp = Math.min(lastTimestamp, tempDirStatus.getModificationTime()); -227 hasChanges |= (lastTimestamp >= lastModifiedTime); -228 if (!hasChanges) { -229 FileStatus[] tmpSnapshots = FSUtils.listStatus(fs, snapshotDir); -230 if (tmpSnapshots != null) { -231 for (FileStatus dirStatus: tmpSnapshots) { -232 lastTimestamp = Math.min(lastTimestamp, dirStatus.getModificationTime()); -233 } -234 hasChanges |= (lastTimestamp >= lastModifiedTime); -235 } -236 } -237 } catch (FileNotFoundException e) { -238 // Nothing todo, if the tmp dir is empty -239 } -240 -241 // if the snapshot directory wasn't modified since we last check, we are done -242 if (!hasChanges) { -243 return; -244 } -245 -246 // directory was modified, so we need to reload our cache -247 // there could be a slight race here where we miss the cache, check the directory modification -248 // time, then someone updates the directory, causing us to not scan the directory again. -249 // However, snapshot directories are only created once, so this isn't an issue. -250 -251 // 1. update the modified time -252 this.lastModifiedTime = lastTimestamp; -253 -254 // 2.clear the cache -255 this.cache.clear(); -256 Map<String, SnapshotDirectoryInfo> known = new HashMap<String, SnapshotDirectoryInfo>(); -257 -258 // 3. check each of the snapshot directories -259 FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir); -260 if (snapshots == null) { -261 // remove all the remembered snapshots because we don't have any left -262 if (LOG.isDebugEnabled() && this.snapshots.size() > 0) { -263 LOG.debug("No snapshots on-disk, cache empty"); -264 } -265 this.snapshots.clear(); -266 return; -267 } -268 -269 // 3.1 iterate through the on-disk snapshots -270 for (FileStatus snapshot : snapshots) { -271 String name = snapshot.getPath().getName(); -272 // its not the tmp dir, -273 if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) { -274 SnapshotDirectoryInfo files = this.snapshots.remove(name); -275 // 3.1.1 if we don't know about the snapshot or its been modified, we need to update the -276 // files the latter could occur where I create a snapshot, then delete it, and then make a -277 // new snapshot with the same name. We will need to update the cache the information from -278 // that new snapshot, even though it has the same name as the files referenced have -279 // probably changed. -280 if (files == null || files.hasBeenModified(snapshot.getModificationTime())) { -281 // get all files for the snapshot and create a new info -282 Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshot.getPath()); -283 files = new SnapshotDirectoryInfo(snapshot.getModificationTime(), storedFiles); -284 } -285 // 3.2 add all the files to cache -286 this.cache.addAll(files.getFiles()); -287 known.put(name, files); -288 } -289 } -290 -291 // 4. set the snapshots we are tracking -292 this.snapshots.clear(); -293 this.snapshots.putAll(known); -294 } -295 -296 @VisibleForTesting List<String> getSnapshotsInProgress() throws IOException { -297 List<String> snapshotInProgress = Lists.newArrayList(); -298 // only add those files to the cache, but not to the known snapshots -299 Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME); -300 // only add those files to the cache, but not to the known snapshots -301 FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir); -302 if (running != null) { -303 for (FileStatus run : running) { -304 try { -305 snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(run.getPath())); -306 } catch (CorruptedSnapshotException e) { -307 // See HBASE-16464 -308 if (e.getCause() instanceof FileNotFoundException) { -309 // If the snapshot is not in progress, we will delete it -310 if (!fs.exists(new Path(run.getPath(), -311 SnapshotDescriptionUtils.SNAPSHOT_IN_PROGRESS))) { -312 fs.delete(run.getPath(), true); -313 LOG.warn("delete the " + run.getPath() + " due to exception:", e.getCause()); -314 } -315 } else { -316 throw e; -317 } -318 } -319 } -320 } -321 return snapshotInProgress; -322 } -323 -324 /** -325 * Simple helper task that just periodically attempts to refresh the cache -326 */ -327 public class RefreshCacheTask extends TimerTask { -328 @Override -329 public void run() { -330 try { -331 SnapshotFileCache.this.refreshCache(); -332 } catch (IOException e) { -333 LOG.warn("Failed to refresh snapshot hfile cache!", e); -334 } -335 } -336 } -337 -338 @Override -339 public void stop(String why) { -340 if (!this.stop) { -341 this.stop = true; -342 this.refreshTimer.cancel(); +030import java.util.concurrent.locks.ReentrantLock; +031 +032import com.google.common.annotations.VisibleForTesting; +033import com.google.common.collect.Lists; +034import org.apache.commons.logging.Log; +035import org.apache.commons.logging.LogFactory; +036import org.apache.hadoop.hbase.classification.InterfaceAudience; +037import org.apache.hadoop.hbase.classification.InterfaceStability; +038import org.apache.hadoop.conf.Configuration; +039import org.apache.hadoop.fs.FileStatus; +040import org.apache.hadoop.fs.FileSystem; +041import org.apache.hadoop.fs.Path; +042import org.apache.hadoop.hbase.Stoppable; +043import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +044import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +045import org.apache.hadoop.hbase.util.FSUtils; +046 +047/** +048 * Intelligently keep track of all the files for all the snapshots. +049 * <p> +050 * A cache of files is kept to avoid querying the {@link FileSystem} frequently. If there is a cache +051 * miss the directory modification time is used to ensure that we don't rescan directories that we +052 * already have in cache. We only check the modification times of the snapshot directories +053 * (/hbase/.snapshot/[snapshot_name]) to determine if the files need to be loaded into the cache. +054 * <p> +055 * New snapshots will be added to the cache and deleted snapshots will be removed when we refresh +056 * the cache. If the files underneath a snapshot directory are changed, but not the snapshot itself, +057 * we will ignore updates to that snapshot's files. +058 * <p> +059 * This is sufficient because each snapshot has its own directory and is added via an atomic rename +060 * <i>once</i>, when the snapshot is created. We don't need to worry about the data in the snapshot +061 * being run. +062 * <p> +063 * Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are +064 * also removed from the cache. +065 * <p> +066 * A {@link SnapshotFileCache.SnapshotFileInspector} must be passed when creating <tt>this</tt> to +067 * allow extraction of files under /hbase/.snapshot/[snapshot name] directory, for each snapshot. +068 * This allows you to only cache files under, for instance, all the logs in the .logs directory or +069 * all the files under all the regions. +070 * <p> +071 * <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid +072 * snapshots and will attempt to cache files from those snapshots as well. +073 * <p> +074 * Queries about a given file are thread-safe with respect to multiple queries and cache refreshes. +075 */ +076@InterfaceAudience.Private +077@InterfaceStability.Evolving +078public class SnapshotFileCache implements Stoppable { +079 interface SnapshotFileInspector { +080 /** +081 * Returns a collection of file names needed by the snapshot. +082 * @param snapshotDir {@link Path} to the snapshot directory to scan. +083 * @return the collection of file names needed by the snapshot. +084 */ +085 Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException; +086 } +087 +088 private static final Log LOG = LogFactory.getLog(SnapshotFileCache.class); +089 private volatile boolean stop = false; +090 private final FileSystem fs; +091 private final SnapshotFileInspector fileInspector; +092 private final Path snapshotDir; +093 private final Set<String> cache = new HashSet<String>(); +094 /** +095 * This is a helper map of information about the snapshot directories so we don't need to rescan +096 * them if they haven't changed since the last time we looked. +097 */ +098 private final Map<String, SnapshotDirectoryInfo> snapshots = +099 new HashMap<String, SnapshotDirectoryInfo>(); +100 private final Timer refreshTimer; +101 +102 private long lastModifiedTime = Long.MIN_VALUE; +103 +104 /** +105 * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the +106 * filesystem. +107 * <p> +108 * Immediately loads the file cache. +109 * @param conf to extract the configured {@link FileSystem} where the snapshots are stored and +110 * hbase root directory +111 * @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed +112 * @param refreshThreadName name of the cache refresh thread +113 * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files. +114 * @throws IOException if the {@link FileSystem} or root directory cannot be loaded +115 */ +116 public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName, +117 SnapshotFileInspector inspectSnapshotFiles) throws IOException { +118 this(FSUtils.getCurrentFileSystem(conf), FSUtils.getRootDir(conf), 0, cacheRefreshPeriod, +119 refreshThreadName, inspectSnapshotFiles); +120 } +121 +122 /** +123 * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the +124 * filesystem +125 * @param fs {@link FileSystem} where the snapshots are stored +126 * @param rootDir hbase root directory +127 * @param cacheRefreshPeriod period (ms) with which the cache should be refreshed +128 * @param cacheRefreshDelay amount of time to wait for the cache to be refreshed +129 * @param refreshThreadName name of the cache refresh thread +130 * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files. +131 */ +132 public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod, +133 long cacheRefreshDelay, String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) { +134 this.fs = fs; +135 this.fileInspector = inspectSnapshotFiles; +136 this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir); +137 // periodically refresh the file cache to make sure we aren't superfluously saving files. +138 this.refreshTimer = new Timer(refreshThreadName, true); +139 this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay, +140 cacheRefreshPeriod); +141 } +142 +143 /** +144 * Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending +145 * cache refreshes. +146 * <p> +147 * Blocks until the cache is refreshed. +148 * <p> +149 * Exposed for TESTING. +150 */ +151 public void triggerCacheRefreshForTesting() { +152 try { +153 SnapshotFileCache.this.refreshCache(); +154 } catch (IOException e) { +155 LOG.warn("Failed to refresh snapshot hfile cache!", e); +156 } +157 LOG.debug("Current cache:" + cache); +158 } +159 +160 /** +161 * Check to see if any of the passed file names is contained in any of the snapshots. +162 * First checks an in-memory cache of the files to keep. If its not in the cache, then the cache +163 * is refreshed and the cache checked again for that file. +164 * This ensures that we never return files that exist. +165 * <p> +166 * Note this may lead to periodic false positives for the file being referenced. Periodically, the +167 * cache is refreshed even if there are no requests to ensure that the false negatives get removed +168 * eventually. For instance, suppose you have a file in the snapshot and it gets loaded into the +169 * cache. Then at some point later that snapshot is deleted. If the cache has not been refreshed +170 * at that point, cache will still think the file system contains that file and return +171 * <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was +172 * on the filesystem, we will never find it and always return <tt>false</tt>. +173 * @param files file to check, NOTE: Relies that files are loaded from hdfs before method +174 * is called (NOT LAZY) +175 * @return <tt>unReferencedFiles</tt> the collection of files that do not have snapshot references +176 * @throws IOException if there is an unexpected error reaching the filesystem. +177 */ +178 // XXX this is inefficient to synchronize on the method, when what we really need to guard against +179 // is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the +180 // cache, but that seems overkill at the moment and isn't necessarily a bottleneck. +181 public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files, +182 final SnapshotManager snapshotManager) +183 throws IOException { +184 List<FileStatus> unReferencedFiles = Lists.newArrayList(); +185 List<String> snapshotsInProgress = null; +186 boolean refreshed = false; +187 for (FileStatus file : files) { +188 String fileName = file.getPath().getName(); +189 if (!refreshed && !cache.contains(fileName)) { +190 refreshCache(); +191 refreshed = true; +192 } +193 if (cache.contains(fileName)) { +194 continue; +195 } +196 if (snapshotsInProgress == null) { +197 snapshotsInProgress = getSnapshotsInProgress(snapshotManager); +198 } +199 if (snapshotsInProgress.contains(fileName)) { +200 continue; +201 } +202 unReferencedFiles.add(file); +203 } +204 return unReferencedFiles; +205 } +206 +207 private synchronized void refreshCache() throws IOException { +208 long lastTimestamp = Long.MAX_VALUE; +209 boolean hasChanges = false; +210 +211 // get the status of the snapshots directory and check if it is has changes +212 try { +213 FileStatus dirStatus = fs.getFileStatus(snapshotDir); +214 lastTimestamp = dirStatus.getModificationTime(); +215 hasChanges |= (lastTimestamp >= lastModifiedTime); +216 } catch (FileNotFoundException e) { +217 if (this.cache.size() > 0) { +218 LOG.error("Snapshot directory: " + snapshotDir + " doesn't exist"); +219 } +220 return; +221 } +222 +223 // get the status of the snapshots temporary directory and check if it has changes +224 // The top-level directory timestamp is not updated, so we have to check the inner-level. +225 try { +226 Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME); +227 FileStatus tempDirStatus = fs.getFileStatus(snapshotTmpDir); +228 lastTimestamp = Math.min(lastTimestamp, tempDirStatus.getModificationTime()); +229 hasChanges |= (lastTimestamp >= lastModifiedTime); +230 if (!hasChanges) { +231 FileStatus[] tmpSnapshots = FSUtils.listStatus(fs, snapshotDir); +232 if (tmpSnapshots != null) { +233 for (FileStatus dirStatus: tmpSnapshots) { +234 lastTimestamp = Math.min(lastTimestamp, dirStatus.getModificationTime()); +235 } +236 hasChanges |= (lastTimestamp >= lastModifiedTime); +237 } +238 } +239 } catch (FileNotFoundException e) { +240 // Nothing todo, if the tmp dir is empty +241 } +242 +243 // if the snapshot directory wasn't modified since we last check, we are done +244 if (!hasChanges) { +245 return; +246 } +247 +248 // directory was modified, so we need to reload our cache +249 // there could be a slight race here where we miss the cache, check the directory modification +250 // time, then someone updates the directory, causing us to not scan the directory again. +251 // However, snapshot directories are only created once, so this isn't an issue. +252 +253 // 1. update the modified time +254 this.lastModifiedTime = lastTimestamp; +255 +256 // 2.clear the cache +257 this.cache.clear(); +258 Map<String, SnapshotDirectoryInfo> known = new HashMap<String, SnapshotDirectoryInfo>(); +259 +260 // 3. check each of the snapshot directories +261 FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir); +262 if (snapshots == null) { +263 // remove all the remembered snapshots because we don't have any left +264 if (LOG.isDebugEnabled() && this.snapshots.size() > 0) { +265 LOG.debug("No snapshots on-disk, cache empty"); +266 } +267 this.snapshots.clear(); +268 return; +269 } +270 +271 // 3.1 iterate through the on-disk snapshots +272 for (FileStatus snapshot : snapshots) { +273 String name = snapshot.getPath().getName(); +274 // its not the tmp dir, +275 if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) { +276 SnapshotDirectoryInfo files = this.snapshots.remove(name); +277 // 3.1.1 if we don't know about the snapshot or its been modified, we need to update the +278 // files the latter could occur where I create a snapshot, then delete it, and then make a +279 // new snapshot with the same name. We will need to update the cache the information from +280 // that new snapshot, even though it has the same name as the files referenced have +281 // probably changed. +282 if (files == null || files.hasBeenModified(snapshot.getModificationTime())) { +283 // get all files for the snapshot and create a new info +284 Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshot.getPath()); +285 files = new SnapshotDirectoryInfo(snapshot.getModificationTime(), storedFiles); +286 } +287 // 3.2 add all the files to cache +288 this.cache.addAll(files.getFiles()); +289 known.put(name, files); +290 } +291 } +292 +293 // 4. set the snapshots we are tracking +294 this.snapshots.clear(); +295 this.snapshots.putAll(known); +296 } +297 +298 @VisibleForTesting List<String> getSnapshotsInProgress( +299 final SnapshotManager snapshotManager) throws IOException { +300 List<String> snapshotInProgress = Lists.newArrayList(); +301 // only add those files to the cache, but not to the known snapshots +302 Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME); +303 // only add those files to the cache, but not to the known snapshots +304 FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir); +305 if (running != null) { +306 for (FileStatus run : running) { +307 ReentrantLock lock = null; +308 if (snapshotManager != null) { +309 lock = snapshotManager.getLocks().acquireLock(run.getPath().getName()); +310 } +311 try { +312 snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(run.getPath())); +313 } catch (CorruptedSnapshotException e) { +314 // See HBASE-16464 +315 if (e.getCause() instanceof FileNotFoundException) { +316 // If the snapshot is corrupt, we will delete it +317 fs.delete(run.getPath(), true); +318 LOG.warn("delete the " + run.getPath() + " due to exception:", e.getCause()); +319 } else { +320 throw e; +321 } +322 } finally { +323 if (lock != null) { +324 lock.unlock(); +325 } +326 } +327 } +328 } +329 return snapshotInProgress; +330 } +331 +332 /** +333 * Simple helper task that just periodically attempts to refresh the cache +334 */ +335 public class RefreshCacheTask extends TimerTask { +336 @Override +337 public void run() { +338 try { +339 SnapshotFileCache.this.refreshCache(); +340 } catch (IOException e) { +341 LOG.warn("Failed to refresh snapshot hfile cache!", e); +342 } 343 } -344 -345 } -346 -347 @Override -348 public boolean isStopped() { -349 return this.stop; -350 } -351 -352 /** -353 * Information about a snapshot directory -354 */ -355 private static class SnapshotDirectoryInfo { -356 long lastModified; -357 Collection<String> files; -358 -359 public SnapshotDirectoryInfo(long mtime, Collection<String> files) { -360 this.lastModified = mtime; -361 this.files = files; -362 } -363 -364 /** -365 * @return the hfiles in the snapshot when <tt>this</tt> was made. -366 */ -367 public Collection<String> getFiles() { -368 return this.files; -369 } -370 -371 /** -372 * Check if the snapshot directory has been modified -373 * @param mtime current modification time of the directory -374 * @return <tt>true</tt> if it the modification time of the directory is newer time when we -375 * created <tt>this</tt> -376 */ -377 public boolean hasBeenModified(long mtime) { -378 return this.lastModified < mtime; -379 } -380 } -381} +344 } +345 +346 @Override +347 public void stop(String why) { +348 if (!this.stop) { +349 this.stop = true; +350 this.refreshTimer.cancel(); +351 } +352 +353 } +354 +355 @Override +356 public boolean isStopped() { +357 return this.stop; +358 } +359 +360 /** +361 * Information about a snapshot directory +362 */ +363 private static class SnapshotDirectoryInfo { +364 long lastModified; +365 Collection<String> files; +366 +367 public SnapshotDirectoryInfo(long mtime, Collection<String> files) { +368 this.lastModified = mtime; +369 this.files = files; +370 } +371 +372 /** +373 * @return the hfiles in the snapshot when <tt>this</tt> was made. +374 */ +375 public Collection<String> getFiles() { +376 return this.files; +377 } +378 +379 /** +380 * Check if the snapshot directory has been modified +381 * @param mtime current modification time of the directory +382 * @return <tt>true</tt> if it the modification time of the directory is newer time when we +383 * created <tt>this</tt> +384 */ +385 public boolean hasBeenModified(long mtime) { +386 return this.lastModified < mtime; +387 } +388 } +389}