Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0F131200C7E for ; Tue, 9 May 2017 00:17:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0D98B160BD1; Mon, 8 May 2017 22:17:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0A6CB160BCB for ; Tue, 9 May 2017 00:17:20 +0200 (CEST) Received: (qmail 82134 invoked by uid 500); 8 May 2017 22:17:18 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 81731 invoked by uid 99); 8 May 2017 22:17:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 May 2017 22:17:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 98474DFB92; Mon, 8 May 2017 22:17:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: weiz@apache.org To: commits@hive.apache.org Date: Mon, 08 May 2017 22:17:29 -0000 Message-Id: In-Reply-To: <7284b6bf220f44d99fb30d7fad1ba878@git.apache.org> References: <7284b6bf220f44d99fb30d7fad1ba878@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/50] [abbrv] hive git commit: HIVE-16552: Limit the number of tasks a Spark job may contain (Reviewed by Rui) archived-at: Mon, 08 May 2017 22:17:22 -0000 HIVE-16552: Limit the number of tasks a Spark job may contain (Reviewed by Rui) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c6b5ad66 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c6b5ad66 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c6b5ad66 Branch: refs/heads/hive-14535 Commit: c6b5ad663d235c15fc5bb5a24a1d3e9ac0d05140 Parents: 9e9356b Author: Xuefu Zhang Authored: Thu May 4 09:31:28 2017 -0700 Committer: Xuefu Zhang Committed: Thu May 4 09:31:28 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../test/resources/testconfiguration.properties | 4 +- .../hadoop/hive/cli/control/CliConfigs.java | 1 + .../hadoop/hive/ql/exec/spark/SparkTask.java | 6 ++ .../spark/status/RemoteSparkJobMonitor.java | 15 +++- .../ql/exec/spark/status/SparkJobMonitor.java | 10 ++- .../clientnegative/spark_job_max_tasks.q | 6 ++ .../spark/spark_job_max_tasks.q.out | 77 ++++++++++++++++++++ 8 files changed, 118 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 84398c6..99c26ce 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3350,6 +3350,8 @@ public class HiveConf extends Configuration { "hive.spark.use.groupby.shuffle", true, "Spark groupByKey transformation has better performance but uses unbounded memory." + "Turn this off when there is a memory issue."), + SPARK_JOB_MAX_TASKS("hive.spark.job.max.tasks", -1, "The maximum number of tasks a Spark job may have.\n" + + "If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 753f3a9..5ab3076 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1445,4 +1445,6 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ groupby2_multi_distinct.q,\ groupby3_map_skew_multi_distinct.q,\ groupby3_multi_distinct.q,\ - groupby_grouping_sets7.q + groupby_grouping_sets7.q,\ + spark_job_max_tasks.q + http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 67064b8..1457db0 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -287,6 +287,7 @@ public class CliConfigs { excludesFrom(testConfigProps, "minimr.query.negative.files"); excludeQuery("authorization_uri_import.q"); + excludeQuery("spark_job_max_tasks.q"); setResultsDir("ql/src/test/results/clientnegative"); setLogDir("itests/qtest/target/qfile-results/clientnegative"); http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 32a7730..98b1605 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -129,8 +129,14 @@ public class SparkTask extends Task { // TODO: If the timeout is because of lack of resources in the cluster, we should // ideally also cancel the app request here. But w/o facilities from Spark or YARN, // it's difficult to do it on hive side alone. See HIVE-12650. + LOG.info("Failed to submit Spark job " + sparkJobID); + jobRef.cancelJob(); + } else if (rc == 4) { + LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) + + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); jobRef.cancelJob(); } + if (this.jobID == null) { this.jobID = sparkJobStatus.getAppID(); } http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index dd73f3e..9dfb65e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -34,7 +34,8 @@ import org.apache.spark.JobExecutionStatus; * It print current job status to console and sleep current thread between monitor interval. */ public class RemoteSparkJobMonitor extends SparkJobMonitor { - + private int sparkJobMaxTaskCount = -1; + private int totalTaskCount = 0; private RemoteSparkJobStatus sparkJobStatus; private final HiveConf hiveConf; @@ -42,6 +43,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { super(hiveConf); this.sparkJobStatus = sparkJobStatus; this.hiveConf = hiveConf; + sparkJobMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS); } @Override @@ -100,6 +102,17 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { } else { console.logInfo(format); } + } else { + // Count the number of tasks, and kill application if it goes beyond the limit. + if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) { + totalTaskCount = getTotalTaskCount(progressMap); + if (totalTaskCount > sparkJobMaxTaskCount) { + rc = 4; + done = true; + console.printInfo("\nThe total number of task in the Spark job [" + totalTaskCount + "] is greater than the limit [" + + sparkJobMaxTaskCount + "]. The Spark job will be cancelled."); + } + } } printStatus(progressMap, lastProgressMap); http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 0b224f2..41730b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -66,7 +66,6 @@ abstract class SparkJobMonitor { private int lines = 0; private final PrintStream out; - private static final int COLUMN_1_WIDTH = 16; private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s "; private static final String STAGE_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s "; @@ -173,6 +172,15 @@ abstract class SparkJobMonitor { lastPrintTime = System.currentTimeMillis(); } + protected int getTotalTaskCount(Map progressMap) { + int totalTasks = 0; + for (SparkStageProgress progress: progressMap.values() ) { + totalTasks += progress.getTotalTaskCount(); + } + + return totalTasks; + } + private String getReport(Map progressMap) { StringBuilder reportBuffer = new StringBuilder(); SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/test/queries/clientnegative/spark_job_max_tasks.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/spark_job_max_tasks.q b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q new file mode 100644 index 0000000..7473050 --- /dev/null +++ b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q @@ -0,0 +1,6 @@ +set hive.spark.job.max.tasks=2; + +EXPLAIN +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s; + +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s; http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out new file mode 100644 index 0000000..ba2f09e --- /dev/null +++ b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out @@ -0,0 +1,77 @@ +PREHOOK: query: EXPLAIN +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 2) + Reducer 3 <- Reducer 2 (SORT, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: key, value + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(value) + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: double) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: double) + sort order: + + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Reducer 3 + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +PREHOOK: type: QUERY +PREHOOK: Input: default@src1 +#### A masked pattern was here #### +FAILED: Execution Error, return code 4 from org.apache.hadoop.hive.ql.exec.spark.SparkTask