Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 86654200BBE for ; Fri, 28 Oct 2016 04:08:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8509D160B01; Fri, 28 Oct 2016 02:08:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 53E35160AF6 for ; Fri, 28 Oct 2016 04:08:50 +0200 (CEST) Received: (qmail 92743 invoked by uid 500); 28 Oct 2016 02:08:49 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 92732 invoked by uid 99); 28 Oct 2016 02:08:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 02:08:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6097EE0FA0; Fri, 28 Oct 2016 02:08:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Message-Id: <2ab72eec0fa947659a6038db5b42d203@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-15027 : make sure export takes MM information into account (Sergey Shelukhin) Date: Fri, 28 Oct 2016 02:08:49 +0000 (UTC) archived-at: Fri, 28 Oct 2016 02:08:51 -0000 Repository: hive Updated Branches: refs/heads/hive-14535 e083d33ac -> 2e602596f HIVE-15027 : make sure export takes MM information into account (Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2e602596 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2e602596 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2e602596 Branch: refs/heads/hive-14535 Commit: 2e602596f7af6c302fd23628d4337673ca38be86 Parents: e083d33 Author: Sergey Shelukhin Authored: Thu Oct 27 19:08:33 2016 -0700 Committer: Sergey Shelukhin Committed: Thu Oct 27 19:08:33 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/metastore/ObjectStore.java | 1 - .../apache/hadoop/hive/ql/exec/CopyTask.java | 15 +++-- .../apache/hadoop/hive/ql/exec/Utilities.java | 30 +++++++++ .../hive/ql/io/CombineHiveInputFormat.java | 29 ++++++--- .../hive/ql/parse/ExportSemanticAnalyzer.java | 66 +++++++++++++++++--- .../hive/ql/parse/ImportSemanticAnalyzer.java | 4 +- .../ql/plan/ConditionalResolverMergeFiles.java | 1 + .../apache/hadoop/hive/ql/plan/CopyWork.java | 53 ++++++++++++---- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 10 +++ 9 files changed, 171 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index a1b3a09..8ad7059 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -636,7 +636,6 @@ public class ObjectStore implements RawStore, Configurable { transactionStatus = TXN_STATUS.COMMITED; try { - LOG.error("TODO# grrrrr"); currentTransaction.commit(); } catch (Exception ex) { Throwable candidate = ex; http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index a8a44bc..9f89ea5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -53,19 +53,24 @@ public class CopyTask extends Task implements Serializable { @Override public int execute(DriverContext driverContext) { + Path[] from = work.getFromPaths(), to = work.getToPaths(); + for (int i = 0; i < from.length; ++i) { + int result = copyOnePath(from[i], to[i]); + if (result != 0) return result; + } + return 0; + } + + protected int copyOnePath(Path fromPath, Path toPath) { FileSystem dstFs = null; - Path toPath = null; try { - Path fromPath = work.getFromPath(); - toPath = work.getToPath(); - console.printInfo("Copying data from " + fromPath.toString(), " to " + toPath.toString()); FileSystem srcFs = fromPath.getFileSystem(conf); dstFs = toPath.getFileSystem(conf); - FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.isSourceMm()); + FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs()); if (srcs == null || srcs.length == 0) { if (work.isErrorOnSrcEmpty()) { console.printError("No files matching path: " + fromPath.toString()); http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 6774d4d..8e506aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4077,4 +4077,34 @@ public final class Utilities { } } + /** + * @return the complete list of valid MM directories under a table/partition path; null + * if the entire directory is valid (has no uncommitted/temporary files). + */ + public static List getValidMmDirectoriesFromTableOrPart(Path path, Configuration conf, + ValidWriteIds ids, int lbLevels) throws IOException { + Utilities.LOG14535.info("Looking for valid MM paths under " + path); + // NULL means this directory is entirely valid. + List result = null; + FileSystem fs = path.getFileSystem(conf); + FileStatus[] children = (lbLevels == 0) ? fs.listStatus(path) + : fs.globStatus(new Path(path, StringUtils.repeat("*" + Path.SEPARATOR, lbLevels) + "*")); + for (int i = 0; i < children.length; ++i) { + FileStatus file = children[i]; + Path childPath = file.getPath(); + Long writeId = ValidWriteIds.extractWriteId(childPath); + if (!file.isDirectory() || writeId == null || !ids.isValid(writeId)) { + Utilities.LOG14535.info("Skipping path " + childPath); + if (result == null) { + result = new ArrayList<>(children.length - 1); + for (int j = 0; j < i; ++j) { + result.add(children[j].getPath()); + } + } + } else if (result != null) { + result.add(childPath); + } + } + return result; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index cc1de11..15d6b9b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -85,20 +87,22 @@ public class CombineHiveInputFormat call() throws Exception { Set nonCombinablePathIndices = new HashSet(); for (int i = 0; i < length; i++) { - PartitionDesc part = - HiveFileFormatUtils.getPartitionDescFromPathRecursively( + PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively( pathToPartitionInfo, paths[i + start], IOPrepareCache.get().allocatePartitionDescMap()); // Use HiveInputFormat if any of the paths is not splittable @@ -107,12 +111,16 @@ public class CombineHiveInputFormat>> futureList = new ArrayList>>(numThreads); try { + boolean isMerge = mrwork.isMergeFromResolver(); for (int i = 0; i < numThreads; i++) { int start = i * numPathPerThread; int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; - futureList.add(executor.submit( - new CheckNonCombinablePathCallable(paths, start, length, job))); + futureList.add(executor.submit(new CheckNonCombinablePathCallable( + paths, start, length, job, isMerge))); } Set nonCombinablePathIndices = new HashSet(); for (Future> future : futureList) { http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index 475f2c9..12dea9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -18,6 +18,16 @@ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import org.apache.hadoop.hive.common.ValidWriteIds; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.Utilities; + +import org.apache.hadoop.hive.metastore.MetaStoreUtils; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; @@ -171,29 +181,69 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { .getMsg("Exception while writing out the local file"), e); } - if (!(replicationSpec.isMetadataOnly() || (ts == null))) { + if (replicationSpec.isMetadataOnly() || (ts == null)) return; + + try { Path parentPath = new Path(toURI); + boolean isMmTable = MetaStoreUtils.isInsertOnlyTable(ts.tableHandle.getParameters()); + Utilities.LOG14535.info("Exporting table " + ts.tableName + " / " + + ts.tableHandle.getTableName() + ": " + isMmTable); + + int lbLevels = isMmTable && ts.tableHandle.isStoredAsSubDirectories() + ? ts.tableHandle.getSkewedColNames().size() : 0; + ValidWriteIds ids = isMmTable ? db.getValidWriteIdsForTable( + ts.tableHandle.getDbName(), ts.tableHandle.getTableName()) : null; if (ts.tableHandle.isPartitioned()) { for (Partition partition : partitions) { Path fromPath = partition.getDataLocation(); Path toPartPath = new Path(parentPath, partition.getName()); - Task rTask = TaskFactory.get( - new CopyWork(fromPath, toPartPath, false), - conf); - rootTasks.add(rTask); + CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toPartPath); + rootTasks.add(TaskFactory.get(cw, conf)); inputs.add(new ReadEntity(partition)); } } else { Path fromPath = ts.tableHandle.getDataLocation(); Path toDataPath = new Path(parentPath, "data"); - Task rTask = TaskFactory.get(new CopyWork( - fromPath, toDataPath, false), conf); - rootTasks.add(rTask); + CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toDataPath); + rootTasks.add(TaskFactory.get(cw, conf)); inputs.add(new ReadEntity(ts.tableHandle)); } outputs.add(toWriteEntity(parentPath)); + } catch (HiveException | IOException ex) { + throw new SemanticException(ex); } + } + private CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidWriteIds ids, + Path fromPath, Path toDataPath) throws IOException { + List validPaths = null; + if (isMmTable) { + fromPath = fromPath.getFileSystem(conf).makeQualified(fromPath); + validPaths = Utilities.getValidMmDirectoriesFromTableOrPart(fromPath, conf, ids, lbLevels); + } + if (validPaths == null) { + return new CopyWork(fromPath, toDataPath, false); // Not MM, or no need to skip anything. + } else { + return createCopyWorkForValidPaths(fromPath, toDataPath, validPaths); + } } + private CopyWork createCopyWorkForValidPaths( + Path fromPath, Path toPartPath, List validPaths) { + Path[] from = new Path[validPaths.size()], to = new Path[validPaths.size()]; + int i = 0; + String fromPathStr = fromPath.toString(); + if (!fromPathStr.endsWith(Path.SEPARATOR)) { + fromPathStr += "/"; + } + for (Path validPath : validPaths) { + from[i] = validPath; + // TODO: assumes the results are already qualified. + to[i] = new Path(toPartPath, validPath.toString().substring(fromPathStr.length())); + Utilities.LOG14535.info("Will copy " + from[i] + " to " + to[i] + + " based on dest " + toPartPath + ", from " + fromPathStr + ", subpath " + validPath); + ++i; + } + return new CopyWork(from, to); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 87b85c8..8aa076f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -347,7 +347,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { + mmWriteId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName())); CopyWork cv = new CopyWork(dataPath, destPath, false); - cv.setIsSourceMm(isSourceMm); + cv.setSkipSourceMmDirs(isSourceMm); LoadTableDesc loadTableWork = new LoadTableDesc(destPath, Utilities.getTableDesc(table), new TreeMap(), replace, mmWriteId); MoveWork mv = new MoveWork(getInputs(), getOutputs(), loadTableWork, null, false); @@ -411,7 +411,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm " + mmWriteId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec())); CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false); - cw.setIsSourceMm(isSourceMm); + cw.setSkipSourceMmDirs(isSourceMm); DDLWork dw = new DDLWork(getInputs(), getOutputs(), addPartitionDesc); LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), partSpec.getPartSpec(), true, mmWriteId); http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 4635f18..7fcb1ff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -330,6 +330,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, mWork.setMinSplitSize(targetSize); mWork.setMinSplitSizePerNode(targetSize); mWork.setMinSplitSizePerRack(targetSize); + mWork.setIsMergeFromResolver(true); } private static class AverageSize { http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index 2e484ba..c08911f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -30,10 +30,10 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; @Explain(displayName = "Copy", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class CopyWork implements Serializable { private static final long serialVersionUID = 1L; - private Path fromPath; - private Path toPath; + private Path[] fromPath; + private Path[] toPath; private boolean errorOnSrcEmpty; - private boolean isMm = false; + private boolean isSkipMmDirs = false; public CopyWork() { } @@ -43,18 +43,45 @@ public class CopyWork implements Serializable { } public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) { + this(new Path[] { fromPath }, new Path[] { toPath }); + this.setErrorOnSrcEmpty(errorOnSrcEmpty); + } + + public CopyWork(final Path[] fromPath, final Path[] toPath) { + if (fromPath.length != toPath.length) { + throw new RuntimeException( + "Cannot copy " + fromPath.length + " paths into " + toPath.length + " paths"); + } this.fromPath = fromPath; this.toPath = toPath; - this.setErrorOnSrcEmpty(errorOnSrcEmpty); } - + + // Keep backward compat in explain for single-file copy tasks. @Explain(displayName = "source", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public Path getFromPath() { - return fromPath; + public Path getFromPathExplain() { + return (fromPath == null || fromPath.length > 1) ? null : fromPath[0]; } @Explain(displayName = "destination", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public Path getToPath() { + public Path getToPathExplain() { + return (toPath == null || toPath.length > 1) ? null : toPath[0]; + } + + @Explain(displayName = "sources", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public Path[] getFromPathsExplain() { + return (fromPath != null && fromPath.length > 1) ? fromPath : null; + } + + @Explain(displayName = "destinations", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public Path[] getToPathsExplain() { + return (toPath != null && toPath.length > 1) ? toPath : null; + } + + public Path[] getFromPaths() { + return fromPath; + } + + public Path[] getToPaths() { return toPath; } @@ -66,12 +93,14 @@ public class CopyWork implements Serializable { return errorOnSrcEmpty; } - public void setIsSourceMm(boolean isMm) { - this.isMm = isMm; + /** Whether the copy should ignore MM directories in the source, and copy their content to + * destination directly, rather than copying the directories themselves. */ + public void setSkipSourceMmDirs(boolean isMm) { + this.isSkipMmDirs = isMm; } - public boolean isSourceMm() { - return isMm ; + public boolean doSkipSourceMmDirs() { + return isSkipMmDirs ; } } http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 1be4d84..5a81a62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -148,6 +148,8 @@ public class MapWork extends BaseWork { /** Whether LLAP IO will be used for inputs. */ private String llapIoDesc; + private boolean isMergeFromResolver; + public MapWork() {} public MapWork(String name) { @@ -718,4 +720,12 @@ public class MapWork extends BaseWork { public VectorizedRowBatch getVectorizedRowBatch() { return vectorizedRowBatch; } + + public void setIsMergeFromResolver(boolean b) { + this.isMergeFromResolver = b; + } + + public boolean isMergeFromResolver() { + return this.isMergeFromResolver; + } }