hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [19/22] hive git commit: HIVE-17537 Move Warehouse class to standalone metastore. This closes #252. (Alan Gates, reviewed by Zoltan Haindrich)
Date Wed, 20 Sep 2017 01:41:26 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/56083008/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
new file mode 100755
index 0000000..649437f
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.HdfsUtils;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class represents a warehouse where data of Hive tables is stored
+ */
+public class Warehouse {
+  public static final String DEFAULT_DATABASE_NAME = "default";
+  public static final String DEFAULT_DATABASE_COMMENT = "Default Hive database";
+  public static final String DEFAULT_SERIALIZATION_FORMAT = "1";
+  public static final String DATABASE_WAREHOUSE_SUFFIX = ".db";
+
+  private Path whRoot;
+  private final Configuration conf;
+  private final String whRootString;
+
+  public static final Logger LOG = LoggerFactory.getLogger("hive.metastore.warehouse");
+
+  private MetaStoreFS fsHandler = null;
+  private boolean storageAuthCheck = false;
+  private ReplChangeManager cm = null;
+
+  public Warehouse(Configuration conf) throws MetaException {
+    this.conf = conf;
+    whRootString = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE);
+    if (StringUtils.isBlank(whRootString)) {
+      throw new MetaException(ConfVars.WAREHOUSE.varname
+          + " is not set in the config or blank");
+    }
+    fsHandler = getMetaStoreFsHandler(conf);
+    cm = ReplChangeManager.getInstance(conf);
+    storageAuthCheck = MetastoreConf.getBoolVar(conf, ConfVars.AUTHORIZATION_STORAGE_AUTH_CHECKS);
+  }
+
+  private MetaStoreFS getMetaStoreFsHandler(Configuration conf)
+      throws MetaException {
+    String handlerClassStr = MetastoreConf.getVar(conf, ConfVars.FS_HANDLER_CLS);
+    try {
+      Class<? extends MetaStoreFS> handlerClass = (Class<? extends MetaStoreFS>)
Class
+          .forName(handlerClassStr, true, JavaUtils.getClassLoader());
+      MetaStoreFS handler = ReflectionUtils.newInstance(handlerClass, conf);
+      return handler;
+    } catch (ClassNotFoundException e) {
+      throw new MetaException("Error in loading MetaStoreFS handler."
+          + e.getMessage());
+    }
+  }
+
+
+  /**
+   * Helper functions to convert IOException to MetaException
+   */
+  public static FileSystem getFs(Path f, Configuration conf) throws MetaException {
+    try {
+      return f.getFileSystem(conf);
+    } catch (IOException e) {
+      MetaStoreUtils.logAndThrowMetaException(e);
+    }
+    return null;
+  }
+
+  public FileSystem getFs(Path f) throws MetaException {
+    return getFs(f, conf);
+  }
+
+
+  /**
+   * Hadoop File System reverse lookups paths with raw ip addresses The File
+   * System URI always contains the canonical DNS name of the Namenode.
+   * Subsequently, operations on paths with raw ip addresses cause an exception
+   * since they don't match the file system URI.
+   *
+   * This routine solves this problem by replacing the scheme and authority of a
+   * path with the scheme and authority of the FileSystem that it maps to.
+   *
+   * @param path
+   *          Path to be canonicalized
+   * @return Path with canonical scheme and authority
+   */
+  public static Path getDnsPath(Path path, Configuration conf) throws MetaException {
+    FileSystem fs = getFs(path, conf);
+    return (new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), path
+        .toUri().getPath()));
+  }
+
+  public Path getDnsPath(Path path) throws MetaException {
+    return getDnsPath(path, conf);
+  }
+
+  /**
+   * Resolve the configured warehouse root dir with respect to the configuration
+   * This involves opening the FileSystem corresponding to the warehouse root
+   * dir (but that should be ok given that this is only called during DDL
+   * statements for non-external tables).
+   */
+  public Path getWhRoot() throws MetaException {
+    if (whRoot != null) {
+      return whRoot;
+    }
+    whRoot = getDnsPath(new Path(whRootString));
+    return whRoot;
+  }
+
+  public Path getDatabasePath(Database db) throws MetaException {
+    if (db.getName().equalsIgnoreCase(DEFAULT_DATABASE_NAME)) {
+      return getWhRoot();
+    }
+    return new Path(db.getLocationUri());
+  }
+
+  public Path getDefaultDatabasePath(String dbName) throws MetaException {
+    if (dbName.equalsIgnoreCase(DEFAULT_DATABASE_NAME)) {
+      return getWhRoot();
+    }
+    return new Path(getWhRoot(), dbName.toLowerCase() + DATABASE_WAREHOUSE_SUFFIX);
+  }
+
+  /**
+   * Returns the default location of the table path using the parent database's location
+   * @param db Database where the table is created
+   * @param tableName table name
+   * @return
+   * @throws MetaException
+   */
+  public Path getDefaultTablePath(Database db, String tableName)
+      throws MetaException {
+    return getDnsPath(new Path(getDatabasePath(db), MetaStoreUtils.encodeTableName(tableName.toLowerCase())));
+  }
+
+  public static String getQualifiedName(Table table) {
+    return table.getDbName() + "." + table.getTableName();
+  }
+
+  public static String getQualifiedName(Partition partition) {
+    return partition.getDbName() + "." + partition.getTableName() + partition.getValues();
+  }
+
+  public boolean mkdirs(Path f) throws MetaException {
+    FileSystem fs;
+    try {
+      fs = getFs(f);
+      return FileUtils.mkdir(fs, f);
+    } catch (IOException e) {
+      MetaStoreUtils.logAndThrowMetaException(e);
+    }
+    return false;
+  }
+
+  public boolean renameDir(Path sourcePath, Path destPath, boolean needCmRecycle) throws
MetaException {
+    try {
+      if (needCmRecycle) {
+        // Copy the source files to cmroot. As the client will move the source files to another
+        // location, we should make a copy of the files to cmroot instead of moving it.
+        cm.recycle(sourcePath, RecycleType.COPY, true);
+      }
+      FileSystem fs = getFs(sourcePath);
+      return FileUtils.rename(fs, sourcePath, destPath);
+    } catch (Exception ex) {
+      MetaStoreUtils.logAndThrowMetaException(ex);
+    }
+    return false;
+  }
+
+  void addToChangeManagement(Path file) throws MetaException {
+    cm.recycle(file, RecycleType.COPY, true);
+  }
+
+  public boolean deleteDir(Path f, boolean recursive) throws MetaException {
+    return deleteDir(f, recursive, false);
+  }
+
+  public boolean deleteDir(Path f, boolean recursive, boolean ifPurge) throws MetaException
{
+    cm.recycle(f, RecycleType.MOVE, ifPurge);
+    FileSystem fs = getFs(f);
+    return fsHandler.deleteDir(fs, f, recursive, ifPurge, conf);
+  }
+
+  public void recycleDirToCmPath(Path f, boolean ifPurge) throws MetaException {
+    cm.recycle(f, RecycleType.MOVE, ifPurge);
+    return;
+  }
+
+  public boolean isEmpty(Path path) throws IOException, MetaException {
+    ContentSummary contents = getFs(path).getContentSummary(path);
+    if (contents != null && contents.getFileCount() == 0 && contents.getDirectoryCount()
== 1) {
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isWritable(Path path) throws IOException {
+    if (!storageAuthCheck) {
+      // no checks for non-secure hadoop installations
+      return true;
+    }
+    if (path == null) { //what??!!
+      return false;
+    }
+    final FileStatus stat;
+    final FileSystem fs;
+    try {
+      fs = getFs(path);
+      stat = fs.getFileStatus(path);
+      HdfsUtils.checkFileAccess(fs, stat, FsAction.WRITE);
+      return true;
+    } catch (FileNotFoundException fnfe){
+      // File named by path doesn't exist; nothing to validate.
+      return true;
+    } catch (Exception e) {
+      // all other exceptions are considered as emanating from
+      // unauthorized accesses
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Exception when checking if path (" + path + ")", e);
+      }
+      return false;
+    }
+  }
+
+  private static String escapePathName(String path) {
+    return FileUtils.escapePathName(path);
+  }
+
+  private static String unescapePathName(String path) {
+    return FileUtils.unescapePathName(path);
+  }
+
+  /**
+   * Given a partition specification, return the path corresponding to the
+   * partition spec. By default, the specification does not include dynamic partitions.
+   * @param spec
+   * @return string representation of the partition specification.
+   * @throws MetaException
+   */
+  public static String makePartPath(Map<String, String> spec)
+      throws MetaException {
+    return makePartName(spec, true);
+  }
+
+  /**
+   * Makes a partition name from a specification
+   * @param spec
+   * @param addTrailingSeperator if true, adds a trailing separator e.g. 'ds=1/'
+   * @return partition name
+   * @throws MetaException
+   */
+  public static String makePartName(Map<String, String> spec,
+      boolean addTrailingSeperator)
+      throws MetaException {
+    StringBuilder suffixBuf = new StringBuilder();
+    int i = 0;
+    for (Entry<String, String> e : spec.entrySet()) {
+      if (e.getValue() == null || e.getValue().length() == 0) {
+        throw new MetaException("Partition spec is incorrect. " + spec);
+      }
+      if (i>0) {
+        suffixBuf.append(Path.SEPARATOR);
+      }
+      suffixBuf.append(escapePathName(e.getKey()));
+      suffixBuf.append('=');
+      suffixBuf.append(escapePathName(e.getValue()));
+      i++;
+    }
+    if (addTrailingSeperator) {
+      suffixBuf.append(Path.SEPARATOR);
+    }
+    return suffixBuf.toString();
+  }
+  /**
+   * Given a dynamic partition specification, return the path corresponding to the
+   * static part of partition specification. This is basically a copy of makePartName
+   * but we get rid of MetaException since it is not serializable.
+   * @param spec
+   * @return string representation of the static part of the partition specification.
+   */
+  public static String makeDynamicPartName(Map<String, String> spec) {
+    StringBuilder suffixBuf = new StringBuilder();
+    for (Entry<String, String> e : spec.entrySet()) {
+      if (e.getValue() != null && e.getValue().length() > 0) {
+        suffixBuf.append(escapePathName(e.getKey()));
+        suffixBuf.append('=');
+        suffixBuf.append(escapePathName(e.getValue()));
+        suffixBuf.append(Path.SEPARATOR);
+      } else { // stop once we see a dynamic partition
+        break;
+      }
+    }
+    return suffixBuf.toString();
+  }
+
+  static final Pattern pat = Pattern.compile("([^/]+)=([^/]+)");
+
+  private static final Pattern slash = Pattern.compile("/");
+
+  /**
+   * Extracts values from partition name without the column names.
+   * @param name Partition name.
+   * @param result The result. Must be pre-sized to the expected number of columns.
+   */
+  public static AbstractList<String> makeValsFromName(
+      String name, AbstractList<String> result) throws MetaException {
+    assert name != null;
+    String[] parts = slash.split(name, 0);
+    if (result == null) {
+      result = new ArrayList<>(parts.length);
+      for (int i = 0; i < parts.length; ++i) {
+        result.add(null);
+      }
+    } else if (parts.length != result.size()) {
+      throw new MetaException(
+          "Expected " + result.size() + " components, got " + parts.length + " (" + name
+ ")");
+    }
+    for (int i = 0; i < parts.length; ++i) {
+      int eq = parts[i].indexOf('=');
+      if (eq <= 0) {
+        throw new MetaException("Unexpected component " + parts[i]);
+      }
+      result.set(i, unescapePathName(parts[i].substring(eq + 1)));
+    }
+    return result;
+  }
+
+  public static LinkedHashMap<String, String> makeSpecFromName(String name)
+      throws MetaException {
+    if (name == null || name.isEmpty()) {
+      throw new MetaException("Partition name is invalid. " + name);
+    }
+    LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
+    makeSpecFromName(partSpec, new Path(name));
+    return partSpec;
+  }
+
+  public static void makeSpecFromName(Map<String, String> partSpec, Path currPath)
{
+    List<String[]> kvs = new ArrayList<>();
+    do {
+      String component = currPath.getName();
+      Matcher m = pat.matcher(component);
+      if (m.matches()) {
+        String k = unescapePathName(m.group(1));
+        String v = unescapePathName(m.group(2));
+        String[] kv = new String[2];
+        kv[0] = k;
+        kv[1] = v;
+        kvs.add(kv);
+      }
+      currPath = currPath.getParent();
+    } while (currPath != null && !currPath.getName().isEmpty());
+
+    // reverse the list since we checked the part from leaf dir to table's base dir
+    for (int i = kvs.size(); i > 0; i--) {
+      partSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
+    }
+  }
+
+  public static Map<String, String> makeEscSpecFromName(String name) throws MetaException
{
+
+    if (name == null || name.isEmpty()) {
+      throw new MetaException("Partition name is invalid. " + name);
+    }
+    LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
+
+    Path currPath = new Path(name);
+
+    List<String[]> kvs = new ArrayList<>();
+    do {
+      String component = currPath.getName();
+      Matcher m = pat.matcher(component);
+      if (m.matches()) {
+        String k = m.group(1);
+        String v = m.group(2);
+        String[] kv = new String[2];
+        kv[0] = k;
+        kv[1] = v;
+        kvs.add(kv);
+      }
+      currPath = currPath.getParent();
+    } while (currPath != null && !currPath.getName().isEmpty());
+
+    // reverse the list since we checked the part from leaf dir to table's base dir
+    for (int i = kvs.size(); i > 0; i--) {
+      partSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
+    }
+
+    return partSpec;
+  }
+
+  /**
+   * Returns the default partition path of a table within a given database and partition
key value
+   * pairs. It uses the database location and appends it the table name and the partition
key,value
+   * pairs to create the Path for the partition directory
+   *
+   * @param db - parent database which is used to get the base location of the partition
directory
+   * @param tableName - table name for the partitions
+   * @param pm - Partition key value pairs
+   * @return
+   * @throws MetaException
+   */
+  public Path getDefaultPartitionPath(Database db, String tableName,
+      Map<String, String> pm) throws MetaException {
+    return getPartitionPath(getDefaultTablePath(db, tableName), pm);
+  }
+
+  /**
+   * Returns the path object for the given partition key-value pairs and the base location
+   *
+   * @param tblPath - the base location for the partitions. Typically the table location
+   * @param pm - Partition key value pairs
+   * @return
+   * @throws MetaException
+   */
+  public Path getPartitionPath(Path tblPath, Map<String, String> pm)
+      throws MetaException {
+    return new Path(tblPath, makePartPath(pm));
+  }
+
+  /**
+   * Given a database, a table and the partition key value pairs this method returns the
Path object
+   * corresponding to the partition key value pairs. It uses the table location if available
else
+   * uses the database location for constructing the path corresponding to the partition
key-value
+   * pairs
+   *
+   * @param db - Parent database of the given table
+   * @param table - Table for which the partition key-values are given
+   * @param vals - List of values for the partition keys
+   * @return Path corresponding to the partition key-value pairs
+   * @throws MetaException
+   */
+  public Path getPartitionPath(Database db, Table table, List<String> vals)
+      throws MetaException {
+    List<FieldSchema> partKeys = table.getPartitionKeys();
+    if (partKeys == null || (partKeys.size() != vals.size())) {
+      throw new MetaException("Invalid number of partition keys found for " + table.getTableName());
+    }
+    Map<String, String> pm = new LinkedHashMap<>(vals.size());
+    int i = 0;
+    for (FieldSchema key : partKeys) {
+      pm.put(key.getName(), vals.get(i));
+      i++;
+    }
+
+    if (table.getSd().getLocation() != null) {
+      return getPartitionPath(getDnsPath(new Path(table.getSd().getLocation())), pm);
+    } else {
+      return getDefaultPartitionPath(db, table.getTableName(), pm);
+    }
+  }
+
+  public boolean isDir(Path f) throws MetaException {
+    FileSystem fs;
+    try {
+      fs = getFs(f);
+      FileStatus fstatus = fs.getFileStatus(f);
+      if (!fstatus.isDir()) {
+        return false;
+      }
+    } catch (FileNotFoundException e) {
+      return false;
+    } catch (IOException e) {
+      MetaStoreUtils.logAndThrowMetaException(e);
+    }
+    return true;
+  }
+
+  public static String makePartName(List<FieldSchema> partCols,
+      List<String> vals) throws MetaException {
+    return makePartName(partCols, vals, null);
+  }
+
+  /**
+   * @param desc
+   * @return array of FileStatus objects corresponding to the files
+   * making up the passed storage description
+   */
+  public FileStatus[] getFileStatusesForSD(StorageDescriptor desc)
+      throws MetaException {
+    return getFileStatusesForLocation(desc.getLocation());
+  }
+
+  /**
+   * @param location
+   * @return array of FileStatus objects corresponding to the files
+   * making up the passed storage description
+   */
+  public FileStatus[] getFileStatusesForLocation(String location)
+      throws MetaException {
+    try {
+      Path path = new Path(location);
+      FileSystem fileSys = path.getFileSystem(conf);
+      return FileUtils.getFileStatusRecurse(path, -1, fileSys);
+    } catch (IOException ioe) {
+      MetaStoreUtils.logAndThrowMetaException(ioe);
+    }
+    return null;
+  }
+
+  /**
+   * @param db database
+   * @param table table
+   * @return array of FileStatus objects corresponding to the files making up the passed
+   * unpartitioned table
+   */
+  public FileStatus[] getFileStatusesForUnpartitionedTable(Database db, Table table)
+      throws MetaException {
+    Path tablePath = getDnsPath(new Path(table.getSd().getLocation()));
+    try {
+      FileSystem fileSys = tablePath.getFileSystem(conf);
+      return FileUtils.getFileStatusRecurse(tablePath, -1, fileSys);
+    } catch (IOException ioe) {
+      MetaStoreUtils.logAndThrowMetaException(ioe);
+    }
+    return null;
+  }
+
+  /**
+   * Makes a valid partition name.
+   * @param partCols The partition columns
+   * @param vals The partition values
+   * @param defaultStr
+   *    The default name given to a partition value if the respective value is empty or null.
+   * @return An escaped, valid partition name.
+   * @throws MetaException
+   */
+  public static String makePartName(List<FieldSchema> partCols,
+      List<String> vals, String defaultStr) throws MetaException {
+    if ((partCols.size() != vals.size()) || (partCols.size() == 0)) {
+      String errorStr = "Invalid partition key & values; keys [";
+      for (FieldSchema fs : partCols) {
+        errorStr += (fs.getName() + ", ");
+      }
+      errorStr += "], values [";
+      for (String val : vals) {
+        errorStr += (val + ", ");
+      }
+      throw new MetaException(errorStr + "]");
+    }
+    List<String> colNames = new ArrayList<>();
+    for (FieldSchema col: partCols) {
+      colNames.add(col.getName());
+    }
+    return FileUtils.makePartName(colNames, vals, defaultStr);
+  }
+
+  public static List<String> getPartValuesFromPartName(String partName)
+      throws MetaException {
+    LinkedHashMap<String, String> partSpec = Warehouse.makeSpecFromName(partName);
+    List<String> values = new ArrayList<>();
+    values.addAll(partSpec.values());
+    return values;
+  }
+
+  public static Map<String, String> makeSpecFromValues(List<FieldSchema> partCols,
+      List<String> values) {
+    Map<String, String> spec = new LinkedHashMap<>();
+    for (int i = 0; i < values.size(); i++) {
+      spec.put(partCols.get(i).getName(), values.get(i));
+    }
+    return spec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/56083008/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 0fb878a..d3c714b 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -534,6 +534,14 @@ public class MetastoreConf {
         "Inteval for cmroot cleanup thread."),
     REPLCMENABLED("metastore.repl.cm.enabled", "hive.repl.cm.enabled", false,
         "Turn on ChangeManager, so delete files will go to cmrootdir."),
+    REPL_COPYFILE_MAXNUMFILES("metastore.repl.copyfile.maxnumfiles",
+        "hive.exec.copyfile.maxnumfiles", 1L,
+        "Maximum number of files Hive uses to do sequential HDFS copies between directories."
+
+            "Distributed copies (distcp) will be used instead for larger numbers of files
so that copies can be done faster."),
+    REPL_COPYFILE_MAXSIZE("metastore.repl.copyfile.maxsize",
+        "hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/,
+        "Maximum file size (in bytes) that Hive uses to do single HDFS copies between directories."
+
+            "Distributed copies (distcp) will be used instead for bigger files so that copies
can be done faster."),
     SCHEMA_INFO_CLASS("metastore.schema.info.class", "hive.metastore.schema.info.class",
         "org.apache.hadoop.hive.metastore.MetaStoreSchemaInfo",
         "Fully qualified class name for the metastore schema information class \n"

http://git-wip-us.apache.org/repos/asf/hive/blob/56083008/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
index 2310df6..da0ee80 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
@@ -18,29 +18,47 @@
 package org.apache.hadoop.hive.metastore.utils;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
 
 public class FileUtils {
   private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
 
+  public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
   /**
    * Move a particular file or directory to the trash.
    * @param fs FileSystem to use
    * @param f path of file or directory to move to trash.
-   * @param conf
+   * @param conf configuration object
    * @return true if move successful
    * @throws IOException
    */
   public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf, boolean purge)
       throws IOException {
     LOG.debug("deleting  " + f);
-    boolean result = false;
+    boolean result;
     try {
       if(purge) {
         LOG.debug("purge is set to true. Not moving to Trash " + f);
@@ -63,4 +81,286 @@ public class FileUtils {
     }
     return result;
   }
+
+  /**
+   * Copies files between filesystems.
+   */
+  public static boolean copy(FileSystem srcFS, Path src,
+      FileSystem dstFS, Path dst,
+      boolean deleteSource,
+      boolean overwrite,
+      Configuration conf) throws IOException {
+    boolean copied = false;
+    boolean triedDistcp = false;
+
+    /* Run distcp if source file/dir is too big */
+    if (srcFS.getUri().getScheme().equals("hdfs")) {
+      ContentSummary srcContentSummary = srcFS.getContentSummary(src);
+      if (srcContentSummary.getFileCount() >
+            MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXNUMFILES)
+          && srcContentSummary.getLength() >
+            MetastoreConf.getLongVar(conf,ConfVars.REPL_COPYFILE_MAXSIZE)) {
+
+        LOG.info("Source is " + srcContentSummary.getLength() + " bytes. (MAX: " +
+            MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXSIZE) + ")");
+        LOG.info("Source is " + srcContentSummary.getFileCount() + " files. (MAX: " +
+            MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXNUMFILES) + ")");
+        LOG.info("Launch distributed copy (distcp) job.");
+        triedDistcp = true;
+        copied = distCp(srcFS, Collections.singletonList(src), dst, deleteSource, null, conf);
+      }
+    }
+    if (!triedDistcp) {
+      // Note : Currently, this implementation does not "fall back" to regular copy if distcp
+      // is tried and it fails. We depend upon that behaviour in cases like replication,
+      // wherein if distcp fails, there is good reason to not plod along with a trivial
+      // implementation, and fail instead.
+      copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf);
+    }
+    return copied;
+  }
+
+  private static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst,
+                                boolean deleteSource, String doAsUser,
+                                Configuration conf) throws IOException {
+    boolean copied;
+    if (doAsUser == null){
+      copied = HdfsUtils.runDistCp(srcPaths, dst, conf);
+    } else {
+      copied = HdfsUtils.runDistCpAs(srcPaths, dst, conf, doAsUser);
+    }
+    if (copied && deleteSource) {
+      for (Path path : srcPaths) {
+        srcFS.delete(path, true);
+      }
+    }
+    return copied;
+  }
+
+  /**
+   * Creates the directory and all necessary parent directories.
+   * @param fs FileSystem to use
+   * @param f path to create.
+   * @return true if directory created successfully.  False otherwise, including if it exists.
+   * @throws IOException exception in creating the directory
+   */
+  public static boolean mkdir(FileSystem fs, Path f) throws IOException {
+    LOG.info("Creating directory if it doesn't exist: " + f);
+    return fs.mkdirs(f);
+  }
+
+  /**
+   * Rename a file.  Unlike {@link FileSystem#rename(Path, Path)}, if the destPath already
exists
+   * and is a directory, this will NOT move the sourcePath into it.  It will throw an IOException
+   * instead.
+   * @param fs file system paths are on
+   * @param sourcePath source file or directory to move
+   * @param destPath destination file name.  This must be a file and not an existing directory.
+   * @return result of fs.rename.
+   * @throws IOException if fs.rename throws it, or if destPath already exists.
+   */
+  public static boolean rename(FileSystem fs, Path sourcePath, Path destPath) throws IOException
{
+    LOG.info("Renaming " + sourcePath + " to " + destPath);
+
+    // If destPath directory exists, rename call will move the sourcePath
+    // into destPath without failing. So check it before renaming.
+    if (fs.exists(destPath)) {
+      throw new IOException("Cannot rename the source path. The destination "
+          + "path already exists.");
+    }
+    return fs.rename(sourcePath, destPath);
+  }
+
+  // NOTE: This is for generating the internal path name for partitions. Users
+  // should always use the MetaStore API to get the path name for a partition.
+  // Users should not directly take partition values and turn it into a path
+  // name by themselves, because the logic below may change in the future.
+  //
+  // In the future, it's OK to add new chars to the escape list, and old data
+  // won't be corrupt, because the full path name in metastore is stored.
+  // In that case, Hive will continue to read the old data, but when it creates
+  // new partitions, it will use new names.
+  // edit : There are some use cases for which adding new chars does not seem
+  // to be backward compatible - Eg. if partition was created with name having
+  // a special char that you want to start escaping, and then you try dropping
+  // the partition with a hive version that now escapes the special char using
+  // the list below, then the drop partition fails to work.
+
+  private static BitSet charToEscape = new BitSet(128);
+  static {
+    for (char c = 0; c < ' '; c++) {
+      charToEscape.set(c);
+    }
+
+    /*
+     * ASCII 01-1F are HTTP control characters that need to be escaped.
+     * \u000A and \u000D are \n and \r, respectively.
+     */
+    char[] clist = new char[] {'\u0001', '\u0002', '\u0003', '\u0004',
+                               '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', '\n', '\u000B',
+                               '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012',
+                               '\u0013', '\u0014', '\u0015', '\u0016', '\u0017', '\u0018',
'\u0019',
+                               '\u001A', '\u001B', '\u001C', '\u001D', '\u001E', '\u001F',
+                               '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
'{',
+                               '[', ']', '^'};
+
+    for (char c : clist) {
+      charToEscape.set(c);
+    }
+  }
+
+  private static boolean needsEscaping(char c) {
+    return c >= 0 && c < charToEscape.size() && charToEscape.get(c);
+  }
+
+  public static String escapePathName(String path) {
+    return escapePathName(path, null);
+  }
+
+  /**
+   * Escapes a path name.
+   * @param path The path to escape.
+   * @param defaultPath
+   *          The default name for the path, if the given path is empty or null.
+   * @return An escaped path name.
+   */
+  public static String escapePathName(String path, String defaultPath) {
+
+    // __HIVE_DEFAULT_NULL__ is the system default value for null and empty string.
+    // TODO: we should allow user to specify default partition or HDFS file location.
+    if (path == null || path.length() == 0) {
+      if (defaultPath == null) {
+        //previously, when path is empty or null and no default path is specified,
+        // __HIVE_DEFAULT_PARTITION__ was the return value for escapePathName
+        return "__HIVE_DEFAULT_PARTITION__";
+      } else {
+        return defaultPath;
+      }
+    }
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < path.length(); i++) {
+      char c = path.charAt(i);
+      if (needsEscaping(c)) {
+        sb.append('%');
+        sb.append(String.format("%1$02X", (int) c));
+      } else {
+        sb.append(c);
+      }
+    }
+    return sb.toString();
+  }
+
+  public static String unescapePathName(String path) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < path.length(); i++) {
+      char c = path.charAt(i);
+      if (c == '%' && i + 2 < path.length()) {
+        int code = -1;
+        try {
+          code = Integer.parseInt(path.substring(i + 1, i + 3), 16);
+        } catch (Exception e) {
+          code = -1;
+        }
+        if (code >= 0) {
+          sb.append((char) code);
+          i += 2;
+          continue;
+        }
+      }
+      sb.append(c);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Get all file status from a root path and recursively go deep into certain levels.
+   *
+   * @param path
+   *          the root path
+   * @param level
+   *          the depth of directory to explore
+   * @param fs
+   *          the file system
+   * @return array of FileStatus
+   * @throws IOException
+   */
+  public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
+      throws IOException {
+
+    // if level is <0, the return all files/directories under the specified path
+    if ( level < 0) {
+      List<FileStatus> result = new ArrayList<>();
+      try {
+        FileStatus fileStatus = fs.getFileStatus(path);
+        FileUtils.listStatusRecursively(fs, fileStatus, result);
+      } catch (IOException e) {
+        // globStatus() API returns empty FileStatus[] when the specified path
+        // does not exist. But getFileStatus() throw IOException. To mimic the
+        // similar behavior we will return empty array on exception. For external
+        // tables, the path of the table will not exists during table creation
+        return new FileStatus[0];
+      }
+      return result.toArray(new FileStatus[result.size()]);
+    }
+
+    // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
+    StringBuilder sb = new StringBuilder(path.toUri().getPath());
+    for (int i = 0; i < level; i++) {
+      sb.append(Path.SEPARATOR).append("*");
+    }
+    Path pathPattern = new Path(path, sb.toString());
+    return fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER);
+  }
+
+  /**
+   * Recursively lists status for all files starting from a particular directory (or individual
file
+   * as base case).
+   *
+   * @param fs
+   *          file system
+   *
+   * @param fileStatus
+   *          starting point in file system
+   *
+   * @param results
+   *          receives enumeration of all files found
+   */
+  public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus,
+                                           List<FileStatus> results) throws IOException
{
+
+    if (fileStatus.isDir()) {
+      for (FileStatus stat : fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_PATH_FILTER))
{
+        listStatusRecursively(fs, stat, results);
+      }
+    } else {
+      results.add(fileStatus);
+    }
+  }
+
+  public static String makePartName(List<String> partCols, List<String> vals)
{
+    return makePartName(partCols, vals, null);
+  }
+
+  /**
+   * Makes a valid partition name.
+   * @param partCols The partition keys' names
+   * @param vals The partition values
+   * @param defaultStr
+   *         The default name given to a partition value if the respective value is empty
or null.
+   * @return An escaped, valid partition name.
+   */
+  public static String makePartName(List<String> partCols, List<String> vals,
+                                    String defaultStr) {
+    StringBuilder name = new StringBuilder();
+    for (int i = 0; i < partCols.size(); i++) {
+      if (i > 0) {
+        name.append(Path.SEPARATOR);
+      }
+      name.append(escapePathName((partCols.get(i)).toLowerCase(), defaultStr));
+      name.append('=');
+      name.append(escapePathName(vals.get(i), defaultStr));
+    }
+    return name.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/56083008/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
index 7588c9f..c10e36f 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,9 +36,14 @@ import javax.security.auth.login.LoginException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 public class HdfsUtils {
   private static final Logger LOG = LoggerFactory.getLogger(HdfsUtils.class);
+  private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
 
   /**
    * Check the permissions on a file.
@@ -122,4 +129,72 @@ public class HdfsUtils {
     return false;
   }
 
+  public static boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf,
+                                    String doAsUser) throws IOException {
+    UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+        doAsUser, UserGroupInformation.getLoginUser());
+    try {
+      return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() {
+        @Override
+        public Boolean run() throws Exception {
+          return runDistCp(srcPaths, dst, conf);
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf)
+      throws IOException {
+    DistCpOptions options = new DistCpOptions(srcPaths, dst);
+    options.setSyncFolder(true);
+    options.setSkipCRC(true);
+    options.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
+
+    // Creates the command-line parameters for distcp
+    List<String> params = constructDistCpParams(srcPaths, dst, conf);
+
+    try {
+      conf.setBoolean("mapred.mapper.new-api", true);
+      DistCp distcp = new DistCp(conf, options);
+
+      // HIVE-13704 states that we should use run() instead of execute() due to a hadoop
known issue
+      // added by HADOOP-10459
+      if (distcp.run(params.toArray(new String[params.size()])) == 0) {
+        return true;
+      } else {
+        return false;
+      }
+    } catch (Exception e) {
+      throw new IOException("Cannot execute DistCp process: " + e, e);
+    } finally {
+      conf.setBoolean("mapred.mapper.new-api", false);
+    }
+  }
+
+  private static List<String> constructDistCpParams(List<Path> srcPaths, Path
dst,
+                                                    Configuration conf) {
+    List<String> params = new ArrayList<>();
+    for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
+      String distCpOption = entry.getKey();
+      String distCpVal = entry.getValue();
+      params.add("-" + distCpOption);
+      if ((distCpVal != null) && (!distCpVal.isEmpty())){
+        params.add(distCpVal);
+      }
+    }
+    if (params.size() == 0){
+      // if no entries were added via conf, we initiate our defaults
+      params.add("-update");
+      params.add("-skipcrccheck");
+      params.add("-pb");
+    }
+    for (Path src : srcPaths) {
+      params.add(src.toString());
+    }
+    params.add(dst.toString());
+    return params;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/56083008/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 3ef7e51..37fc56b 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -38,4 +38,19 @@ public class MetaStoreUtils {
     throw new MetaException(exInfo);
   }
 
+  public static String encodeTableName(String name) {
+    // The encoding method is simple, e.g., replace
+    // all the special characters with the corresponding number in ASCII.
+    // Note that unicode is not supported in table names. And we have explicit
+    // checks for it.
+    StringBuilder sb = new StringBuilder();
+    for (char ch : name.toCharArray()) {
+      if (Character.isLetterOrDigit(ch) || ch == '_') {
+        sb.append(ch);
+      } else {
+        sb.append('-').append((int) ch).append('-');
+      }
+    }
+    return sb.toString();
+  }
 }


Mime
View raw message