Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B6386184E1 for ; Wed, 16 Sep 2015 19:18:21 +0000 (UTC) Received: (qmail 70437 invoked by uid 500); 16 Sep 2015 19:18:20 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 70317 invoked by uid 500); 16 Sep 2015 19:18:19 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 69704 invoked by uid 99); 16 Sep 2015 19:18:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Sep 2015 19:18:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7A390E0663; Wed, 16 Sep 2015 19:18:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xuefu@apache.org To: commits@hive.apache.org Date: Wed, 16 Sep 2015 19:18:29 -0000 Message-Id: <46c799824cbe4ae7a04b4885f3edc278@git.apache.org> In-Reply-To: <112b118f91794bb288b9d64ebb87ddc1@git.apache.org> References: <112b118f91794bb288b9d64ebb87ddc1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/50] [abbrv] hive git commit: HIVE-11645 : Add in-place updates for dynamic partitions loading (Ashutosh Chauhan via Prasanth J) HIVE-11645 : Add in-place updates for dynamic partitions loading (Ashutosh Chauhan via Prasanth J) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f4361bf3 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f4361bf3 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f4361bf3 Branch: refs/heads/spark Commit: f4361bf30689c4767e966e11c610f7ead632415a Parents: 9fe8802 Author: Ashutosh Chauhan Authored: Thu Sep 10 14:52:43 2015 -0700 Committer: Ashutosh Chauhan Committed: Thu Sep 10 14:52:43 2015 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/MoveTask.java | 12 ++-- .../apache/hadoop/hive/ql/exec/StatsTask.java | 13 +++- .../hadoop/hive/ql/exec/tez/InPlaceUpdates.java | 65 +++++++++++++++++++ .../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 66 ++------------------ .../apache/hadoop/hive/ql/metadata/Hive.java | 25 ++++++-- 5 files changed, 106 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 50c4a96..a1f8973 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -361,7 +361,7 @@ public class MoveTask extends Task implements Serializable { if (dps != null && dps.size() > 0) { pushFeed(FeedType.DYNAMIC_PARTITIONS, dps); } - + console.printInfo(System.getProperty("line.separator")); long startTime = System.currentTimeMillis(); // load the list of DP partitions and return the list of partition specs // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions @@ -381,8 +381,9 @@ public class MoveTask extends Task implements Serializable { isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, SessionState.get().getTxnMgr().getCurrentTxnId()); - console.printInfo("\t Time taken for load dynamic partitions : " + - (System.currentTimeMillis() - startTime)); + + console.printInfo("\t Time taken to load dynamic partitions: " + + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) { throw new HiveException("This query creates no partitions." + @@ -425,11 +426,10 @@ public class MoveTask extends Task implements Serializable { SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } - - console.printInfo("\tLoading partition " + entry.getKey()); + LOG.info("\tLoading partition " + entry.getKey()); } console.printInfo("\t Time taken for adding to write entity : " + - (System.currentTimeMillis() - startTime)); + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); dc = null; // reset data container to prevent it being added again. } else { // static partitions List partVals = MetaStoreUtils.getPvals(table.getPartCols(), http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 2a8167a..41ece04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -182,8 +183,10 @@ public class StatsTask extends Task implements Serializable { parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE); db.alterTable(tableFullName, new Table(tTable)); - - console.printInfo("Table " + tableFullName + " stats: [" + toString(parameters) + ']'); + if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) { + console.printInfo("Table " + tableFullName + " stats: [" + toString(parameters) + ']'); + } + LOG.info("Table " + tableFullName + " stats: [" + toString(parameters) + ']'); } else { // Partitioned table: // Need to get the old stats of the partition @@ -215,7 +218,11 @@ public class StatsTask extends Task implements Serializable { parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE); updates.add(new Partition(table, tPart)); - console.printInfo("Partition " + tableFullName + partn.getSpec() + + if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) { + console.printInfo("Partition " + tableFullName + partn.getSpec() + + " stats: [" + toString(parameters) + ']'); + } + LOG.info("Partition " + tableFullName + partn.getSpec() + " stats: [" + toString(parameters) + ']'); } if (!updates.isEmpty()) { http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java new file mode 100644 index 0000000..6ecfe71 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.fusesource.jansi.Ansi.ansi; +import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO; +import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO; +import static org.fusesource.jansi.internal.CLibrary.isatty; + +import java.io.PrintStream; + +import jline.TerminalFactory; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.fusesource.jansi.Ansi; + +public class InPlaceUpdates { + + private static final int MIN_TERMINAL_WIDTH = 80; + + static boolean isUnixTerminal() { + + String os = System.getProperty("os.name"); + if (os.startsWith("Windows")) { + // we do not support Windows, we will revisit this if we really need it for windows. + return false; + } + + // We must be on some unix variant.. + // check if standard out is a terminal + try { + // isatty system call will return 1 if the file descriptor is terminal else 0 + if (isatty(STDOUT_FILENO) == 0) { + return false; + } + if (isatty(STDERR_FILENO) == 0) { + return false; + } + } catch (NoClassDefFoundError ignore) { + // These errors happen if the JNI lib is not available for your platform. + return false; + } catch (UnsatisfiedLinkError ignore) { + // These errors happen if the JNI lib is not available for your platform. + return false; + } + return true; + } + + public static boolean inPlaceEligible(HiveConf conf) { + boolean inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS); + + // we need at least 80 chars wide terminal to display in-place updates properly + return inPlaceUpdates && !SessionState.getConsole().getIsSilent() && isUnixTerminal() + && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH; + } + + public static void reprintLine(PrintStream out, String line) { + out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); + out.flush(); + } + + public static void rePositionCursor(PrintStream ps) { + ps.print(ansi().cursorUp(0).toString()); + ps.flush(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 1a4decf..1e1603b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -20,9 +20,6 @@ package org.apache.hadoop.hive.ql.exec.tez; import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; import static org.fusesource.jansi.Ansi.ansi; -import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO; -import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO; -import static org.fusesource.jansi.internal.CLibrary.isatty; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -74,7 +71,7 @@ import jline.TerminalFactory; public class TezJobMonitor { private static final String CLASS_NAME = TezJobMonitor.class.getName(); - private static final int MIN_TERMINAL_WIDTH = 80; + private static final int COLUMN_1_WIDTH = 16; private static final int SEPARATOR_WIDTH = 80; @@ -156,42 +153,13 @@ public class TezJobMonitor { } } - private static boolean isUnixTerminal() { - - String os = System.getProperty("os.name"); - if (os.startsWith("Windows")) { - // we do not support Windows, we will revisit this if we really need it for windows. - return false; - } - - // We must be on some unix variant.. - // check if standard out is a terminal - try { - // isatty system call will return 1 if the file descriptor is terminal else 0 - if (isatty(STDOUT_FILENO) == 0) { - return false; - } - if (isatty(STDERR_FILENO) == 0) { - return false; - } - } catch (NoClassDefFoundError ignore) { - // These errors happen if the JNI lib is not available for your platform. - return false; - } catch (UnsatisfiedLinkError ignore) { - // These errors happen if the JNI lib is not available for your platform. - return false; - } - return true; - } - /** * NOTE: Use this method only if isUnixTerminal is true. * Erases the current line and prints the given line. * @param line - line to print */ public void reprintLine(String line) { - out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); - out.flush(); + InPlaceUpdates.reprintLine(out, line); lines++; } @@ -234,15 +202,6 @@ public class TezJobMonitor { } /** - * NOTE: Use this method only if isUnixTerminal is true. - * Gets the width of the terminal - * @return - width of terminal - */ - public int getTerminalWidth() { - return TerminalFactory.get().getWidth(); - } - - /** * monitorExecution handles status printing, failures during execution and final status retrieval. * * @param dagClient client that was used to kick off the job @@ -265,26 +224,11 @@ public class TezJobMonitor { Set opts = new HashSet(); Heartbeater heartbeater = new Heartbeater(txnMgr, conf); long startTime = 0; - boolean isProfileEnabled = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || + boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || Utilities.isPerfOrAboveLogging(conf); - boolean inPlaceUpdates = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS); - boolean wideTerminal = false; - boolean isTerminal = inPlaceUpdates == true ? isUnixTerminal() : false; - - // we need at least 80 chars wide terminal to display in-place updates properly - if (isTerminal) { - if (getTerminalWidth() >= MIN_TERMINAL_WIDTH) { - wideTerminal = true; - } - } - - boolean inPlaceEligible = false; - if (inPlaceUpdates && isTerminal && wideTerminal && !console.getIsSilent()) { - inPlaceEligible = true; - } + boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf); shutdownList.add(dagClient); - console.printInfo("\n"); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); @@ -470,7 +414,7 @@ public class TezJobMonitor { DAGClient dagClient, HiveConf conf, DAG dag) { /* Strings for headers and counters */ - String hiveCountersGroup = conf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); + String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); TezCounters hiveCounters = null; try { http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index c449aee..c78e8f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.metadata; import com.google.common.collect.Sets; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -81,6 +82,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.FunctionUtils; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; @@ -101,6 +103,7 @@ import org.apache.thrift.TException; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -124,7 +127,6 @@ import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM; import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME; - /** * This class has functions that implement meta data/DDL operations using calls * to the metastore. @@ -1606,22 +1608,31 @@ private void constructOneLBLocationMap(FileStatus fSta, } } - if (validPartitions.size() == 0) { + int partsToLoad = validPartitions.size(); + if (partsToLoad == 0) { LOG.warn("No partition is generated by dynamic partitioning"); } - if (validPartitions.size() > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) { - throw new HiveException("Number of dynamic partitions created is " + validPartitions.size() + if (partsToLoad > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) { + throw new HiveException("Number of dynamic partitions created is " + partsToLoad + ", which is more than " + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS) +". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname - + " to at least " + validPartitions.size() + '.'); + + " to at least " + partsToLoad + '.'); } Table tbl = getTable(tableName); // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that Iterator iter = validPartitions.iterator(); + LOG.info("Going to load " + partsToLoad + " partitions."); + PrintStream ps = null; + boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 + && InPlaceUpdates.inPlaceEligible(conf); + if(inPlaceEligible) { + ps = SessionState.getConsole().getInfoStream(); + } + int partitionsLoaded = 0; while (iter.hasNext()) { // get the dynamically created directory Path partPath = iter.next(); @@ -1634,6 +1645,10 @@ private void constructOneLBLocationMap(FileStatus fSta, Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, holdDDLTime, true, listBucketingEnabled, false, isAcid); partitionsMap.put(fullPartSpec, newPartition); + if (inPlaceEligible) { + InPlaceUpdates.rePositionCursor(ps); + InPlaceUpdates.reprintLine(ps, "Loaded : " + ++partitionsLoaded + "/" + partsToLoad +" partitions."); + } LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); } if (isAcid) {