Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D8AEC200D42 for ; Fri, 3 Nov 2017 03:53:54 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D72A5160BE5; Fri, 3 Nov 2017 02:53:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5B164160BE6 for ; Fri, 3 Nov 2017 03:53:52 +0100 (CET) Received: (qmail 53458 invoked by uid 500); 3 Nov 2017 02:53:51 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 53449 invoked by uid 99); 3 Nov 2017 02:53:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Nov 2017 02:53:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2B783DFE61; Fri, 3 Nov 2017 02:53:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: busbey@apache.org To: commits@hbase.apache.org Date: Fri, 03 Nov 2017 02:53:51 -0000 Message-Id: In-Reply-To: <273b6c4b1020478589020c050948dd45@git.apache.org> References: <273b6c4b1020478589020c050948dd45@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hbase git commit: HBASE-18784 if available, query underlying outputstream capabilities where we need hflush/hsync. archived-at: Fri, 03 Nov 2017 02:53:55 -0000 HBASE-18784 if available, query underlying outputstream capabilities where we need hflush/hsync. * pull things that don't rely on HDFS in hbase-server/FSUtils into hbase-common/CommonFSUtils * refactor setStoragePolicy so that it can move into hbase-common/CommonFSUtils, as a side effect update it for Hadoop 2.8,3.0+ * refactor WALProcedureStore so that it handles its own FS interactions * add a reflection-based lookup of stream capabilities * call said lookup in places where we make WALs to make sure hflush/hsync is available. * javadoc / checkstyle cleanup on changes as flagged by yetus Signed-off-by: Chia-Ping Tsai Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e79a007d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e79a007d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e79a007d Branch: refs/heads/master Commit: e79a007dd9810b33cd508986037e17d45b55a705 Parents: 0ff9dab Author: Sean Busbey Authored: Thu Oct 12 10:59:43 2017 -0500 Committer: Sean Busbey Committed: Thu Nov 2 21:29:20 2017 -0500 ---------------------------------------------------------------------- .../apache/hadoop/hbase/util/CommonFSUtils.java | 890 +++++++++++++++++++ .../hadoop/hbase/util/TestCommonFSUtils.java | 164 ++++ .../procedure2/store/wal/WALProcedureStore.java | 70 +- .../procedure2/ProcedureTestingUtility.java | 12 +- .../hbase/procedure2/TestChildProcedures.java | 2 +- .../hbase/procedure2/TestProcedureEvents.java | 2 +- .../procedure2/TestProcedureExecution.java | 2 +- .../hbase/procedure2/TestProcedureMetrics.java | 2 +- .../hbase/procedure2/TestProcedureNonce.java | 2 +- .../hbase/procedure2/TestProcedureRecovery.java | 2 +- .../procedure2/TestProcedureReplayOrder.java | 2 +- .../procedure2/TestStateMachineProcedure.java | 2 +- .../hbase/procedure2/TestYieldProcedures.java | 2 +- ...ProcedureWALLoaderPerformanceEvaluation.java | 2 +- .../wal/ProcedureWALPerformanceEvaluation.java | 9 +- .../store/wal/TestStressWALProcedureStore.java | 2 +- .../store/wal/TestWALProcedureStore.java | 4 +- .../org/apache/hadoop/hbase/fs/HFileSystem.java | 3 - .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 11 +- .../org/apache/hadoop/hbase/master/HMaster.java | 18 +- .../hadoop/hbase/master/MasterFileSystem.java | 10 +- .../procedure/MasterProcedureConstants.java | 3 - .../hbase/regionserver/wal/AbstractFSWAL.java | 29 +- .../wal/AbstractProtobufLogWriter.java | 5 +- .../wal/AsyncProtobufLogWriter.java | 5 +- .../regionserver/wal/ProtobufLogWriter.java | 8 +- .../org/apache/hadoop/hbase/util/FSUtils.java | 668 +------------- .../hadoop/hbase/wal/AsyncFSWALProvider.java | 22 +- .../apache/hadoop/hbase/wal/FSHLogProvider.java | 22 +- .../hbase/io/asyncfs/TestLocalAsyncOutput.java | 4 +- .../master/assignment/MockMasterServices.java | 5 +- .../procedure/TestMasterProcedureWalLease.java | 4 +- .../procedure/TestWALProcedureStoreOnHDFS.java | 2 +- .../regionserver/wal/AbstractTestFSWAL.java | 45 +- .../regionserver/wal/AbstractTestWALReplay.java | 11 +- .../apache/hadoop/hbase/util/TestFSUtils.java | 187 ++-- .../apache/hadoop/hbase/wal/IOTestProvider.java | 14 +- 37 files changed, 1344 insertions(+), 903 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java new file mode 100644 index 0000000..bdf148e --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -0,0 +1,890 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; + +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Utility methods for interacting with the underlying file system. + */ +@InterfaceAudience.Private +public abstract class CommonFSUtils { + private static final Log LOG = LogFactory.getLog(CommonFSUtils.class); + + /** Parameter name for HBase WAL directory */ + public static final String HBASE_WAL_DIR = "hbase.wal.dir"; + + /** Full access permissions (starting point for a umask) */ + public static final String FULL_RWX_PERMISSIONS = "777"; + + protected CommonFSUtils() { + super(); + } + + /** + * Compare of path component. Does not consider schema; i.e. if schemas + * different but path starts with rootPath, + * then the function returns true + * @param rootPath value to check for + * @param path subject to check + * @return True if path starts with rootPath + */ + public static boolean isStartingWithPath(final Path rootPath, final String path) { + String uriRootPath = rootPath.toUri().getPath(); + String tailUriPath = (new Path(path)).toUri().getPath(); + return tailUriPath.startsWith(uriRootPath); + } + + /** + * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the + * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, + * the two will equate. + * @param pathToSearch Path we will be trying to match against. + * @param pathTail what to match + * @return True if pathTail is tail on the path of pathToSearch + */ + public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { + return isMatchingTail(pathToSearch, new Path(pathTail)); + } + + /** + * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the + * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider + * schema; i.e. if schemas different but path or subpath matches, the two will equate. + * @param pathToSearch Path we will be trying to match agains against + * @param pathTail what to match + * @return True if pathTail is tail on the path of pathToSearch + */ + public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { + if (pathToSearch.depth() != pathTail.depth()) { + return false; + } + Path tailPath = pathTail; + String tailName; + Path toSearch = pathToSearch; + String toSearchName; + boolean result = false; + do { + tailName = tailPath.getName(); + if (tailName == null || tailName.length() <= 0) { + result = true; + break; + } + toSearchName = toSearch.getName(); + if (toSearchName == null || toSearchName.length() <= 0) { + break; + } + // Move up a parent on each path for next go around. Path doesn't let us go off the end. + tailPath = tailPath.getParent(); + toSearch = toSearch.getParent(); + } while(tailName.equals(toSearchName)); + return result; + } + + /** + * Delete if exists. + * @param fs filesystem object + * @param dir directory to delete + * @return True if deleted dir + * @throws IOException e + */ + public static boolean deleteDirectory(final FileSystem fs, final Path dir) + throws IOException { + return fs.exists(dir) && fs.delete(dir, true); + } + + /** + * Return the number of bytes that large input files should be optimally + * be split into to minimize i/o time. + * + * use reflection to search for getDefaultBlockSize(Path f) + * if the method doesn't exist, fall back to using getDefaultBlockSize() + * + * @param fs filesystem object + * @return the default block size for the path's filesystem + * @throws IOException e + */ + public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException { + Method m = null; + Class cls = fs.getClass(); + try { + m = cls.getMethod("getDefaultBlockSize", new Class[] { Path.class }); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem doesn't support getDefaultBlockSize"); + } catch (SecurityException e) { + LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e); + m = null; // could happen on setAccessible() + } + if (m == null) { + return fs.getDefaultBlockSize(path); + } else { + try { + Object ret = m.invoke(fs, path); + return ((Long)ret).longValue(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + /* + * Get the default replication. + * + * use reflection to search for getDefaultReplication(Path f) + * if the method doesn't exist, fall back to using getDefaultReplication() + * + * @param fs filesystem object + * @param f path of file + * @return default replication for the path's filesystem + * @throws IOException e + */ + public static short getDefaultReplication(final FileSystem fs, final Path path) + throws IOException { + Method m = null; + Class cls = fs.getClass(); + try { + m = cls.getMethod("getDefaultReplication", new Class[] { Path.class }); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem doesn't support getDefaultReplication"); + } catch (SecurityException e) { + LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e); + m = null; // could happen on setAccessible() + } + if (m == null) { + return fs.getDefaultReplication(path); + } else { + try { + Object ret = m.invoke(fs, path); + return ((Number)ret).shortValue(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + /** + * Returns the default buffer size to use during writes. + * + * The size of the buffer should probably be a multiple of hardware + * page size (4096 on Intel x86), and it determines how much data is + * buffered during read and write operations. + * + * @param fs filesystem object + * @return default buffer size to use during writes + */ + public static int getDefaultBufferSize(final FileSystem fs) { + return fs.getConf().getInt("io.file.buffer.size", 4096); + } + + /** + * Create the specified file on the filesystem. By default, this will: + *
    + *
  1. apply the umask in the configuration (if it is enabled)
  2. + *
  3. use the fs configured buffer size (or 4096 if not set)
  4. + *
  5. use the default replication
  6. + *
  7. use the default block size
  8. + *
  9. not track progress
  10. + *
+ * + * @param fs {@link FileSystem} on which to write the file + * @param path {@link Path} to the file to write + * @param perm intial permissions + * @param overwrite Whether or not the created file should be overwritten. + * @return output stream to the created file + * @throws IOException if the file cannot be created + */ + public static FSDataOutputStream create(FileSystem fs, Path path, + FsPermission perm, boolean overwrite) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite); + } + return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), + getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); + } + + /** + * Get the file permissions specified in the configuration, if they are + * enabled. + * + * @param fs filesystem that the file will be created on. + * @param conf configuration to read for determining if permissions are + * enabled and which to use + * @param permssionConfKey property key in the configuration to use when + * finding the permission + * @return the permission to use when creating a new file on the fs. If + * special permissions are not specified in the configuration, then + * the default permissions on the the fs will be returned. + */ + public static FsPermission getFilePermissions(final FileSystem fs, + final Configuration conf, final String permssionConfKey) { + boolean enablePermissions = conf.getBoolean( + HConstants.ENABLE_DATA_FILE_UMASK, false); + + if (enablePermissions) { + try { + FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); + // make sure that we have a mask, if not, go default. + String mask = conf.get(permssionConfKey); + if (mask == null) { + return FsPermission.getFileDefault(); + } + // appy the umask + FsPermission umask = new FsPermission(mask); + return perm.applyUMask(umask); + } catch (IllegalArgumentException e) { + LOG.warn( + "Incorrect umask attempted to be created: " + + conf.get(permssionConfKey) + + ", using default file permissions.", e); + return FsPermission.getFileDefault(); + } + } + return FsPermission.getFileDefault(); + } + + /** + * Verifies root directory path is a valid URI with a scheme + * + * @param root root directory path + * @return Passed root argument. + * @throws IOException if not a valid URI with a scheme + */ + public static Path validateRootPath(Path root) throws IOException { + try { + URI rootURI = new URI(root.toString()); + String scheme = rootURI.getScheme(); + if (scheme == null) { + throw new IOException("Root directory does not have a scheme"); + } + return root; + } catch (URISyntaxException e) { + IOException io = new IOException("Root directory path is not a valid " + + "URI -- check your " + HConstants.HBASE_DIR + " configuration"); + io.initCause(e); + throw io; + } + } + + /** + * Checks for the presence of the WAL log root path (using the provided conf object) in the given + * path. If it exists, this method removes it and returns the String representation of remaining + * relative path. + * @param path must not be null + * @param conf must not be null + * @return String representation of the remaining relative path + * @throws IOException from underlying filesystem + */ + public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { + Path root = getWALRootDir(conf); + String pathStr = path.toString(); + // check that the path is absolute... it has the root path in it. + if (!pathStr.startsWith(root.toString())) { + return pathStr; + } + // if not, return as it is. + return pathStr.substring(root.toString().length() + 1);// remove the "/" too. + } + + /** + * Return the 'path' component of a Path. In Hadoop, Path is an URI. This + * method returns the 'path' component of a Path's URI: e.g. If a Path is + * hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir, + * this method returns /hbase_trunk/TestTable/compaction.dir. + * This method is useful if you want to print out a Path without qualifying + * Filesystem instance. + * @param p Filesystem Path whose 'path' component we are to return. + * @return Path portion of the Filesystem + */ + public static String getPath(Path p) { + return p.toUri().getPath(); + } + + /** + * @param c configuration + * @return {@link Path} to hbase root directory from + * configuration as a qualified Path. + * @throws IOException e + */ + public static Path getRootDir(final Configuration c) throws IOException { + Path p = new Path(c.get(HConstants.HBASE_DIR)); + FileSystem fs = p.getFileSystem(c); + return p.makeQualified(fs); + } + + public static void setRootDir(final Configuration c, final Path root) throws IOException { + c.set(HConstants.HBASE_DIR, root.toString()); + } + + public static void setFsDefault(final Configuration c, final Path root) throws IOException { + c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ + } + + public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { + Path p = getRootDir(c); + return p.getFileSystem(c); + } + + /** + * @param c configuration + * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from + * configuration as a qualified Path. Defaults to HBase root dir. + * @throws IOException e + */ + public static Path getWALRootDir(final Configuration c) throws IOException { + Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); + if (!isValidWALRootDir(p, c)) { + return getRootDir(c); + } + FileSystem fs = p.getFileSystem(c); + return p.makeQualified(fs); + } + + @VisibleForTesting + public static void setWALRootDir(final Configuration c, final Path root) throws IOException { + c.set(HBASE_WAL_DIR, root.toString()); + } + + public static FileSystem getWALFileSystem(final Configuration c) throws IOException { + Path p = getWALRootDir(c); + return p.getFileSystem(c); + } + + private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { + Path rootDir = getRootDir(c); + if (walDir != rootDir) { + if (walDir.toString().startsWith(rootDir.toString() + "/")) { + throw new IllegalStateException("Illegal WAL directory specified. " + + "WAL directories are not permitted to be under the root directory if set."); + } + } + return true; + } + + /** + * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under + * path rootdir + * + * @param rootdir qualified path of HBase root directory + * @param tableName name of table + * @return {@link org.apache.hadoop.fs.Path} for table + */ + public static Path getTableDir(Path rootdir, final TableName tableName) { + return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), + tableName.getQualifierAsString()); + } + + /** + * Returns the {@link org.apache.hadoop.hbase.TableName} object representing + * the table directory under + * path rootdir + * + * @param tablePath path of table + * @return {@link org.apache.hadoop.fs.Path} for table + */ + public static TableName getTableName(Path tablePath) { + return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); + } + + /** + * Returns the {@link org.apache.hadoop.fs.Path} object representing + * the namespace directory under path rootdir + * + * @param rootdir qualified path of HBase root directory + * @param namespace namespace name + * @return {@link org.apache.hadoop.fs.Path} for table + */ + public static Path getNamespaceDir(Path rootdir, final String namespace) { + return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, + new Path(namespace))); + } + + /** + * Sets storage policy for given path according to config setting. + * If the passed path is a directory, we'll set the storage policy for all files + * created in the future in said directory. Note that this change in storage + * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. + * If we're running on a FileSystem implementation that doesn't support the given storage policy + * (or storage policies at all), then we'll issue a log message and continue. + * + * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html + * + * @param fs We only do anything it implements a setStoragePolicy method + * @param conf used to look up storage policy with given key; not modified. + * @param path the Path whose storage policy is to be set + * @param policyKey Key to use pulling a policy from Configuration: + * e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy). + * @param defaultPolicy if the configured policy is equal to this policy name, we will skip + * telling the FileSystem to set a storage policy. + */ + public static void setStoragePolicy(final FileSystem fs, final Configuration conf, + final Path path, final String policyKey, final String defaultPolicy) { + String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT); + if (storagePolicy.equals(defaultPolicy)) { + if (LOG.isTraceEnabled()) { + LOG.trace("default policy of " + defaultPolicy + " requested, exiting early."); + } + return; + } + setStoragePolicy(fs, path, storagePolicy); + } + + // this mapping means that under a federated FileSystem implementation, we'll + // only log the first failure from any of the underlying FileSystems at WARN and all others + // will be at DEBUG. + private static final Map warningMap = + new ConcurrentHashMap(); + + /** + * Sets storage policy for given path. + * If the passed path is a directory, we'll set the storage policy for all files + * created in the future in said directory. Note that this change in storage + * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. + * If we're running on a version of FileSystem that doesn't support the given storage policy + * (or storage policies at all), then we'll issue a log message and continue. + * + * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html + * + * @param fs We only do anything it implements a setStoragePolicy method + * @param path the Path whose storage policy is to be set + * @param storagePolicy Policy to set on path; see hadoop 2.6+ + * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g + * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. + */ + public static void setStoragePolicy(final FileSystem fs, final Path path, + final String storagePolicy) { + if (storagePolicy == null) { + if (LOG.isTraceEnabled()) { + LOG.trace("We were passed a null storagePolicy, exiting early."); + } + return; + } + final String trimmedStoragePolicy = storagePolicy.trim(); + if (trimmedStoragePolicy.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("We were passed an empty storagePolicy, exiting early."); + } + return; + } + invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); + } + + /* + * All args have been checked and are good. Run the setStoragePolicy invocation. + */ + private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, + final String storagePolicy) { + Method m = null; + try { + m = fs.getClass().getDeclaredMethod("setStoragePolicy", + new Class[] { Path.class, String.class }); + m.setAccessible(true); + } catch (NoSuchMethodException e) { + final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " + + "not available. This is normal and expected on earlier Hadoop versions."; + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn(msg, e); + } else if (LOG.isDebugEnabled()) { + LOG.debug(msg, e); + } + m = null; + } catch (SecurityException e) { + final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " + + "HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " + + "to the user@hbase mailing list. Please be sure to include a link to your configs, and " + + "logs that include this message and period of time before it. Logs around service " + + "start up will probably be useful as well."; + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn(msg, e); + } else if (LOG.isDebugEnabled()) { + LOG.debug(msg, e); + } + m = null; // could happen on setAccessible() or getDeclaredMethod() + } + if (m != null) { + try { + m.invoke(fs, path, storagePolicy); + if (LOG.isDebugEnabled()) { + LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path); + } + } catch (Exception e) { + // This swallows FNFE, should we be throwing it? seems more likely to indicate dev + // misuse than a runtime problem with HDFS. + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " + + "DEBUG log level might have more details.", e); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); + } + // check for lack of HDFS-7228 + if (e instanceof InvocationTargetException) { + final Throwable exception = e.getCause(); + if (exception instanceof RemoteException && + HadoopIllegalArgumentException.class.getName().equals( + ((RemoteException)exception).getClassName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " + + "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + + "trying to use SSD related policies then you're likely missing HDFS-7228. For " + + "more information see the 'ArchivalStorage' docs for your Hadoop release."); + } + // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation + // that throws UnsupportedOperationException + } else if (exception instanceof UnsupportedOperationException) { + if (LOG.isDebugEnabled()) { + LOG.debug("The underlying FileSystem implementation doesn't support " + + "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " + + "appears to be present in your version of Hadoop. For more information check " + + "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " + + "specification docs from HADOOP-11981, and/or related documentation from the " + + "provider of the underlying FileSystem (its name should appear in the " + + "stacktrace that accompanies this message). Note in particular that Hadoop's " + + "local filesystem implementation doesn't support storage policies.", exception); + } + } + } + } + } + } + + /** + * @param conf must not be null + * @return True if this filesystem whose scheme is 'hdfs'. + * @throws IOException from underlying FileSystem + */ + public static boolean isHDFS(final Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + String scheme = fs.getUri().getScheme(); + return scheme.equalsIgnoreCase("hdfs"); + } + + /** + * Checks if the given path is the one with 'recovered.edits' dir. + * @param path must not be null + * @return True if we recovered edits + */ + public static boolean isRecoveredEdits(Path path) { + return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); + } + + /** + * @param conf must not be null + * @return Returns the filesystem of the hbase rootdir. + * @throws IOException from underlying FileSystem + */ + public static FileSystem getCurrentFileSystem(Configuration conf) + throws IOException { + return getRootDir(conf).getFileSystem(conf); + } + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal + * This accommodates differences between hadoop versions, where hadoop 1 + * does not throw a FileNotFoundException, and return an empty FileStatus[] + * while Hadoop 2 will throw FileNotFoundException. + * + * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem, + * Path, FileStatusFilter) instead. + * + * @param fs file system + * @param dir directory + * @param filter path filter + * @return null if dir is empty or doesn't exist, otherwise FileStatus array + */ + public static FileStatus [] listStatus(final FileSystem fs, + final Path dir, final PathFilter filter) throws IOException { + FileStatus [] status = null; + try { + status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + if (status == null || status.length < 1) { + return null; + } + return status; + } + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal + * This would accommodates differences between hadoop versions + * + * @param fs file system + * @param dir directory + * @return null if dir is empty or doesn't exist, otherwise FileStatus array + */ + public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { + return listStatus(fs, dir, null); + } + + /** + * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call + * + * @param fs file system + * @param dir directory + * @return LocatedFileStatus list + */ + public static List listLocatedStatus(final FileSystem fs, + final Path dir) throws IOException { + List status = null; + try { + RemoteIterator locatedFileStatusRemoteIterator = fs + .listFiles(dir, false); + while (locatedFileStatusRemoteIterator.hasNext()) { + if (status == null) { + status = Lists.newArrayList(); + } + status.add(locatedFileStatusRemoteIterator.next()); + } + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + return status; + } + + /** + * Calls fs.delete() and returns the value returned by the fs.delete() + * + * @param fs must not be null + * @param path must not be null + * @param recursive delete tree rooted at path + * @return the value returned by the fs.delete() + * @throws IOException from underlying FileSystem + */ + public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) + throws IOException { + return fs.delete(path, recursive); + } + + /** + * Calls fs.exists(). Checks if the specified path exists + * + * @param fs must not be null + * @param path must not be null + * @return the value returned by fs.exists() + * @throws IOException from underlying FileSystem + */ + public static boolean isExists(final FileSystem fs, final Path path) throws IOException { + return fs.exists(path); + } + + /** + * Log the current state of the filesystem from a certain root directory + * @param fs filesystem to investigate + * @param root root file/directory to start logging from + * @param LOG log to output information + * @throws IOException if an unexpected exception occurs + */ + public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG) + throws IOException { + LOG.debug("Current file system:"); + logFSTree(LOG, fs, root, "|-"); + } + + /** + * Recursive helper to log the state of the FS + * + * @see #logFileSystemState(FileSystem, Path, Log) + */ + private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix) + throws IOException { + FileStatus[] files = listStatus(fs, root, null); + if (files == null) { + return; + } + + for (FileStatus file : files) { + if (file.isDirectory()) { + LOG.debug(prefix + file.getPath().getName() + "/"); + logFSTree(LOG, fs, file.getPath(), prefix + "---"); + } else { + LOG.debug(prefix + file.getPath().getName()); + } + } + } + + public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) + throws IOException { + // set the modify time for TimeToLive Cleaner + fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); + return fs.rename(src, dest); + } + + /** + * Do our short circuit read setup. + * Checks buffer size to use and whether to do checksumming in hbase or hdfs. + * @param conf must not be null + */ + public static void setupShortCircuitRead(final Configuration conf) { + // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. + boolean shortCircuitSkipChecksum = + conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); + boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); + if (shortCircuitSkipChecksum) { + LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " + + "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " + + "it, see https://issues.apache.org/jira/browse/HBASE-6868." : "")); + assert !shortCircuitSkipChecksum; //this will fail if assertions are on + } + checkShortCircuitReadBufferSize(conf); + } + + /** + * Check if short circuit read buffer size is set and if not, set it to hbase value. + * @param conf must not be null + */ + public static void checkShortCircuitReadBufferSize(final Configuration conf) { + final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; + final int notSet = -1; + // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 + final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; + int size = conf.getInt(dfsKey, notSet); + // If a size is set, return -- we will use it. + if (size != notSet) { + return; + } + // But short circuit buffer size is normally not set. Put in place the hbase wanted size. + int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); + conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); + } + + // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and + // not until we attempt to reference it. + private static class StreamCapabilities { + public static final boolean PRESENT; + public static final Class CLASS; + public static final Method METHOD; + static { + boolean tmp = false; + Class clazz = null; + Method method = null; + try { + clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities"); + method = clazz.getMethod("hasCapability", String.class); + tmp = true; + } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) { + LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " + + "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " + + "support hflush/hsync. If you are running on top of HDFS this probably just " + + "means you have an older version and this can be ignored. If you are running on " + + "top of an alternate FileSystem implementation you should manually verify that " + + "hflush and hsync are implemented; otherwise you risk data loss and hard to " + + "diagnose errors when our assumptions are violated."); + LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.", + exception); + } finally { + PRESENT = tmp; + CLASS = clazz; + METHOD = method; + } + } + } + + /** + * If our FileSystem version includes the StreamCapabilities class, check if + * the given stream has a particular capability. + * @param stream capabilities are per-stream instance, so check this one specifically. must not be + * null + * @param capability what to look for, per Hadoop Common's FileSystem docs + * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't + * implement it. return result of asking the stream otherwise. + */ + public static boolean hasCapability(FSDataOutputStream stream, String capability) { + // be consistent whether or not StreamCapabilities is present + if (stream == null) { + throw new NullPointerException("stream parameter must not be null."); + } + // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything + // otherwise old versions of Hadoop will break. + boolean result = true; + if (StreamCapabilities.PRESENT) { + // if StreamCapabilities is present, but the stream doesn't implement it + // or we run into a problem invoking the method, + // we treat that as equivalent to not declaring anything + result = false; + if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) { + try { + result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue(); + } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException + exception) { + LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " + + "our understanding of how it's supposed to work. Please file a JIRA and include " + + "the following stack trace. In the mean time we're interpreting this behavior " + + "difference as a lack of capability support, which will probably cause a failure.", + exception); + } + } + } + return result; + } + + /** + * Helper exception for those cases where the place where we need to check a stream capability + * is not where we have the needed context to explain the impact and mitigation for a lack. + */ + public static class StreamLacksCapabilityException extends Exception { + public StreamLacksCapabilityException(String message, Throwable cause) { + super(message, cause); + } + public StreamLacksCapabilityException(String message) { + super(message); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java new file mode 100644 index 0000000..7ff5792 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java @@ -0,0 +1,164 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test {@link CommonFSUtils}. + */ +@Category({MiscTests.class, MediumTests.class}) +public class TestCommonFSUtils { + private static final Log LOG = LogFactory.getLog(TestCommonFSUtils.class); + + private HBaseCommonTestingUtility htu; + private Configuration conf; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + conf = htu.getConfiguration(); + } + + /** + * Test path compare and prefix checking. + */ + @Test + public void testMatchingTail() throws IOException { + Path rootdir = htu.getDataTestDir(); + final FileSystem fs = rootdir.getFileSystem(conf); + assertTrue(rootdir.depth() > 1); + Path partPath = new Path("a", "b"); + Path fullPath = new Path(rootdir, partPath); + Path fullyQualifiedPath = fs.makeQualified(fullPath); + assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath)); + assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath.toString())); + assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullPath.toString())); + assertTrue(CommonFSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString())); + assertFalse(CommonFSUtils.isStartingWithPath(rootdir, partPath.toString())); + assertFalse(CommonFSUtils.isMatchingTail(fullyQualifiedPath, partPath)); + assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath)); + assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString())); + assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath))); + assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString())); + assertFalse(CommonFSUtils.isMatchingTail(fullPath, new Path("x"))); + assertFalse(CommonFSUtils.isMatchingTail(new Path("x"), fullPath)); + } + + private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) + throws Exception { + FSDataOutputStream out = fs.create(file); + byte [] data = new byte[dataSize]; + out.write(data, 0, dataSize); + out.close(); + } + + @Test + public void testSetWALRootDir() throws Exception { + Path p = new Path("file:///hbase/root"); + CommonFSUtils.setWALRootDir(conf, p); + assertEquals(p.toString(), conf.get(CommonFSUtils.HBASE_WAL_DIR)); + } + + @Test + public void testGetWALRootDir() throws IOException { + Path root = new Path("file:///hbase/root"); + Path walRoot = new Path("file:///hbase/logroot"); + CommonFSUtils.setRootDir(conf, root); + assertEquals(CommonFSUtils.getRootDir(conf), root); + assertEquals(CommonFSUtils.getWALRootDir(conf), root); + CommonFSUtils.setWALRootDir(conf, walRoot); + assertEquals(CommonFSUtils.getWALRootDir(conf), walRoot); + } + + @Test(expected=IllegalStateException.class) + public void testGetWALRootDirIllegalWALDir() throws IOException { + Path root = new Path("file:///hbase/root"); + Path invalidWALDir = new Path("file:///hbase/root/logroot"); + CommonFSUtils.setRootDir(conf, root); + CommonFSUtils.setWALRootDir(conf, invalidWALDir); + CommonFSUtils.getWALRootDir(conf); + } + + @Test + public void testRemoveWALRootPath() throws Exception { + CommonFSUtils.setRootDir(conf, new Path("file:///user/hbase")); + Path testFile = new Path(CommonFSUtils.getRootDir(conf), "test/testfile"); + Path tmpFile = new Path("file:///test/testfile"); + assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), "test/testfile"); + assertEquals(CommonFSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString()); + CommonFSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir")); + assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), testFile.toString()); + Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog"); + assertEquals(CommonFSUtils.removeWALRootPath(logFile, conf), "test/testlog"); + } + + @Test(expected=NullPointerException.class) + public void streamCapabilitiesDoesNotAllowNullStream() { + CommonFSUtils.hasCapability(null, "hopefully any string"); + } + + private static final boolean STREAM_CAPABILITIES_IS_PRESENT; + static { + boolean tmp = false; + try { + Class.forName("org.apache.hadoop.fs.StreamCapabilities"); + tmp = true; + LOG.debug("Test thought StreamCapabilities class was present."); + } catch (ClassNotFoundException exception) { + LOG.debug("Test didn't think StreamCapabilities class was present."); + } finally { + STREAM_CAPABILITIES_IS_PRESENT = tmp; + } + } + + @Test + public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException { + FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null); + assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + + "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, + CommonFSUtils.hasCapability(stream, "hsync")); + assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + + "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, + CommonFSUtils.hasCapability(stream, "hflush")); + assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + + "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, + CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " + + "implement.")); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 974bc13..f49833c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -46,18 +46,20 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** * WAL implementation of the ProcedureStore. @@ -67,6 +69,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe public class WALProcedureStore extends ProcedureStoreBase { private static final Log LOG = LogFactory.getLog(WALProcedureStore.class); public static final String LOG_PREFIX = "pv2-"; + /** Used to construct the name of the log directory for master procedures */ + public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; + public interface LeaseRecovery { void recoverFileLease(FileSystem fs, Path path) throws IOException; @@ -185,18 +190,42 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, - final LeaseRecovery leaseRecovery) { - this(conf, fs, walDir, null, leaseRecovery); + public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery) + throws IOException { + this(conf, + new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR), + new Path(CommonFSUtils.getRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), leaseRecovery); } - public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, - final Path walArchiveDir, final LeaseRecovery leaseRecovery) { - this.fs = fs; + @VisibleForTesting + public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir, + final LeaseRecovery leaseRecovery) throws IOException { this.conf = conf; + this.leaseRecovery = leaseRecovery; this.walDir = walDir; this.walArchiveDir = walArchiveDir; - this.leaseRecovery = leaseRecovery; + this.fs = walDir.getFileSystem(conf); + + // Create the log directory for the procedure store + if (!fs.exists(walDir)) { + if (!fs.mkdirs(walDir)) { + throw new IOException("Unable to mkdir " + walDir); + } + } + // Now that it exists, set the log policy + CommonFSUtils.setStoragePolicy(fs, conf, walDir, HConstants.WAL_STORAGE_POLICY, + HConstants.DEFAULT_WAL_STORAGE_POLICY); + + // Create archive dir up front. Rename won't work w/o it up on HDFS. + if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { + if (this.fs.mkdirs(this.walArchiveDir)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir); + } + } else { + LOG.warn("Failed create of " + this.walArchiveDir); + } + } } @Override @@ -247,16 +276,6 @@ public class WALProcedureStore extends ProcedureStoreBase { } }; syncThread.start(); - - // Create archive dir up front. Rename won't work w/o it up on HDFS. - if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { - if (this.fs.mkdirs(this.walArchiveDir)) { - if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " + - this.walArchiveDir); - } else { - LOG.warn("Failed create of " + this.walArchiveDir); - } - } } @Override @@ -1005,6 +1024,17 @@ public class WALProcedureStore extends ProcedureStoreBase { LOG.warn("failed to create log file with id=" + logId, re); return false; } + // After we create the stream but before we attempt to use it at all + // ensure that we can provide the level of data safety we're configured + // to provide. + final String durability = useHsync ? "hsync" : "hflush"; + if (!(CommonFSUtils.hasCapability(newStream, durability))) { + throw new IllegalStateException("The procedure WAL relies on the ability to " + durability + + " for proper operation during component failures, but the underlying filesystem does " + + "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY + + "' to set the desired level of robustness and ensure the config value of '" + + CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it."); + } try { ProcedureWALFormat.writeHeader(newStream, header); startPos = newStream.getPos(); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 99d3c28..6e0c02e 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -51,14 +51,14 @@ public class ProcedureTestingUtility { private ProcedureTestingUtility() { } - public static ProcedureStore createStore(final Configuration conf, final FileSystem fs, - final Path baseDir) throws IOException { - return createWalStore(conf, fs, baseDir); + public static ProcedureStore createStore(final Configuration conf, final Path dir) + throws IOException { + return createWalStore(conf, dir); } - public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs, - final Path walDir) throws IOException { - return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() { + public static WALProcedureStore createWalStore(final Configuration conf, final Path dir) + throws IOException { + return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() { @Override public void recoverFileLease(FileSystem fs, Path path) throws IOException { // no-op http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java index 1a4dd86..4c1611a 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java @@ -60,7 +60,7 @@ public class TestChildProcedures { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java index ce9795f..bd310fd 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java @@ -61,7 +61,7 @@ public class TestProcedureEvents { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procStore.start(1); procExecutor.start(1, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java index 1a3f898..ed6d512 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java @@ -63,7 +63,7 @@ public class TestProcedureExecution { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java index 0a57efa..6246629 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java @@ -66,7 +66,7 @@ public class TestProcedureMetrics { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java index ec2e54e..12a8012 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java @@ -67,7 +67,7 @@ public class TestProcedureNonce { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index b0f6cbc..06f8833 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -69,7 +69,7 @@ public class TestProcedureRecovery { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java index 23ca6ba..12b2184 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java @@ -68,7 +68,7 @@ public class TestProcedureReplayOrder { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcedureEnv(); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procStore.start(NUM_THREADS); procExecutor.start(1, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java index 8347dbf..cbe50f2 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java @@ -76,7 +76,7 @@ public class TestStateMachineProcedure { fs = testDir.getFileSystem(htu.getConfiguration()); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index 4882168..017992c 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -65,7 +65,7 @@ public class TestYieldProcedures { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procRunnables = new TestScheduler(); procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java index 5554a6c..503850d 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java @@ -126,7 +126,7 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool { Path logDir = new Path(testDir, "proc-logs"); System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n"); fs.delete(logDir, true); - store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); + store = ProcedureTestingUtility.createWalStore(conf, logDir); store.start(1); store.recoverLease(); store.load(new LoadCounter()); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java index 823972f..1a7fc80 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java @@ -93,9 +93,9 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { System.out.println("Logs directory : " + logDir.toString()); fs.delete(logDir, true); if ("nosync".equals(syncType)) { - store = new NoSyncWalProcedureStore(conf, fs, logDir); + store = new NoSyncWalProcedureStore(conf, logDir); } else { - store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); + store = ProcedureTestingUtility.createWalStore(conf, logDir); } store.start(numThreads); store.recoverLease(); @@ -244,9 +244,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { } private class NoSyncWalProcedureStore extends WALProcedureStore { - public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs, - final Path logDir) { - super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() { + public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException { + super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() { @Override public void recoverFileLease(FileSystem fs, Path path) throws IOException { // no-op http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java index 610688f..98ec114 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java @@ -75,7 +75,7 @@ public class TestStressWALProcedureStore { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore.start(PROCEDURE_STORE_SLOTS); procStore.recoverLease(); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 44c8e12..98b1b7c 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -86,7 +86,7 @@ public class TestWALProcedureStore { setupConfig(htu.getConfiguration()); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore.start(PROCEDURE_STORE_SLOTS); procStore.recoverLease(); procStore.load(new LoadCounter()); @@ -729,7 +729,7 @@ public class TestWALProcedureStore { assertEquals(procs.length + 1, status.length); // simulate another active master removing the wals - procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir, + procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, new WALProcedureStore.LeaseRecovery() { private int count = 0; http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java index bb34af6..f7eb02b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java @@ -65,9 +65,6 @@ import edu.umd.cs.findbugs.annotations.Nullable; public class HFileSystem extends FilterFileSystem { public static final Log LOG = LogFactory.getLog(HFileSystem.class); - /** Parameter name for HBase WAL directory */ - public static final String HBASE_WAL_DIR = "hbase.wal.dir"; - private final FileSystem noChecksumFs; // read hfile data from storage private final boolean useHBaseChecksum; private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 04bf01f..1f5462f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -56,7 +57,8 @@ public final class AsyncFSOutputHelper { */ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoop eventLoop, - Class channelClass) throws IOException { + Class channelClass) + throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, overwrite, createParent, replication, blockSize, eventLoop, channelClass); @@ -69,6 +71,13 @@ public final class AsyncFSOutputHelper { } else { fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null); } + // After we create the stream but before we attempt to use it at all + // ensure that we can provide the level of data safety we're configured + // to provide. + if (!(CommonFSUtils.hasCapability(fsOut, "hflush") && + CommonFSUtils.hasCapability(fsOut, "hsync"))) { + throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync"); + } final ExecutorService flushExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build()); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 3ba31da..7a778f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -53,7 +53,6 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus.Option; @@ -1201,23 +1200,8 @@ public class HMaster extends HRegionServer implements MasterServices { private void startProcedureExecutor() throws IOException { final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); final Path rootDir = FSUtils.getRootDir(conf); - final Path walDir = new Path(FSUtils.getWALRootDir(this.conf), - MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); - final Path walArchiveDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - final FileSystem walFs = walDir.getFileSystem(conf); - - // Create the log directory for the procedure store - if (!walFs.exists(walDir)) { - if (!walFs.mkdirs(walDir)) { - throw new IOException("Unable to mkdir " + walDir); - } - } - // Now that it exists, set the log policy - FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY, - HConstants.DEFAULT_WAL_STORAGE_POLICY); - - procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, walArchiveDir, + procedureStore = new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 3b268cb..27987f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSTableDescriptors; @@ -144,10 +144,10 @@ public class MasterFileSystem { }; final String[] protectedSubLogDirs = new String[] { - HConstants.HREGION_LOGDIR_NAME, - HConstants.HREGION_OLDLOGDIR_NAME, - HConstants.CORRUPT_DIR_NAME, - MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR + HConstants.HREGION_LOGDIR_NAME, + HConstants.HREGION_OLDLOGDIR_NAME, + HConstants.CORRUPT_DIR_NAME, + WALProcedureStore.MASTER_PROCEDURE_LOGDIR }; // check if the root directory exists checkRootDir(this.rootdir, conf, this.fs); http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java index 16647d2..495fab6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java @@ -24,9 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience; public final class MasterProcedureConstants { private MasterProcedureConstants() {} - /** Used to construct the name of the log directory for master procedures */ - public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; - /** Number of threads used by the procedure executor */ public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads"; public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16; http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 61c7100..ad54cab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; @@ -356,7 +356,7 @@ public abstract class AbstractFSWAL implements WAL { } // Now that it exists, set the storage policy for the entire directory of wal files related to // this FSHLog instance - FSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY, + CommonFSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); @@ -381,7 +381,7 @@ public abstract class AbstractFSWAL implements WAL { }; if (failIfWALExists) { - final FileStatus[] walFiles = FSUtils.listStatus(fs, walDir, ourFiles); + final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); if (null != walFiles && 0 != walFiles.length) { throw new IOException("Target WAL already exists within directory " + walDir); } @@ -398,7 +398,7 @@ public abstract class AbstractFSWAL implements WAL { // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks // (it costs a little x'ing bocks) final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", - FSUtils.getDefaultBlockSize(this.fs, this.walDir)); + CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir)); this.logrollsize = (long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); @@ -652,7 +652,7 @@ public abstract class AbstractFSWAL implements WAL { } } LOG.info("Archiving " + p + " to " + newPath); - if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { + if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { throw new IOException("Unable to rename " + p + " to " + newPath); } // Tell our listeners that a log has been archived. @@ -685,12 +685,12 @@ public abstract class AbstractFSWAL implements WAL { try { long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter); int oldNumEntries = this.numEntries.getAndSet(0); - final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath)); + final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath)); if (oldPath != null) { this.walFile2Props.put(oldPath, new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); this.totalLogSize.addAndGet(oldFileLen); - LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString); } else { LOG.info("New WAL " + newPathString); @@ -767,6 +767,11 @@ public abstract class AbstractFSWAL implements WAL { cleanOldLogs(); regionsToFlush = findRegionsToForceFlush(); } + } catch (CommonFSUtils.StreamLacksCapabilityException exception) { + // If the underlying FileSystem can't do what we ask, treat as IO failure so + // we'll abort. + throw new IOException("Underlying FileSystem can't meet stream requirements. See RS log " + + "for details.", exception); } finally { closeBarrier.endOp(); assert scope == NullScope.INSTANCE || !scope.isDetached(); @@ -794,7 +799,7 @@ public abstract class AbstractFSWAL implements WAL { * @return may be null if there are no files. */ protected FileStatus[] getFiles() throws IOException { - return FSUtils.listStatus(fs, walDir, ourFiles); + return CommonFSUtils.listStatus(fs, walDir, ourFiles); } @Override @@ -833,7 +838,7 @@ public abstract class AbstractFSWAL implements WAL { } } - if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { + if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { throw new IOException("Unable to rename " + file.getPath() + " to " + p); } // Tell our listeners that a log was archived. @@ -843,7 +848,8 @@ public abstract class AbstractFSWAL implements WAL { } } } - LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.walArchiveDir)); + LOG.debug("Moved " + files.length + " WAL file(s) to " + + CommonFSUtils.getPath(this.walArchiveDir)); } LOG.info("Closed WAL: " + toString()); } @@ -1022,7 +1028,8 @@ public abstract class AbstractFSWAL implements WAL { protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; - protected abstract W createWriterInstance(Path path) throws IOException; + protected abstract W createWriterInstance(Path path) throws IOException, + CommonFSUtils.StreamLacksCapabilityException; /** * @return old wal file size http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 3747f47..256ced6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.FSUtils; @@ -153,7 +154,7 @@ public abstract class AbstractProtobufLogWriter { } public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) - throws IOException { + throws IOException, StreamLacksCapabilityException { this.conf = conf; boolean doCompress = initializeCompressionContext(conf, path); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); @@ -237,7 +238,7 @@ public abstract class AbstractProtobufLogWriter { } protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException; + short replication, long blockSize) throws IOException, StreamLacksCapabilityException; /** * return the file length after written.