hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [16/36] hive git commit: HIVE-16799: Control the max number of task for a stage in a spark job (Reviewed by Rui)
Date Tue, 06 Jun 2017 18:35:43 GMT
HIVE-16799: Control the max number of task for a stage in a spark job (Reviewed by Rui)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/363ffe0a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/363ffe0a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/363ffe0a

Branch: refs/heads/hive-14535
Commit: 363ffe0ac7dec7e4804c1eb2ba76cb07660ae020
Parents: b560f49
Author: Xuefu Zhang <xuefu@uber.com>
Authored: Fri Jun 2 11:26:33 2017 -0700
Committer: Xuefu Zhang <xuefu@uber.com>
Committed: Fri Jun 2 11:26:33 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +
 .../test/resources/testconfiguration.properties |  3 +-
 .../hadoop/hive/cli/control/CliConfigs.java     |  1 +
 .../hadoop/hive/ql/exec/spark/SparkTask.java    |  2 +-
 .../spark/status/RemoteSparkJobMonitor.java     | 14 ++++
 .../ql/exec/spark/status/SparkJobMonitor.java   | 12 +++
 .../clientnegative/spark_stage_max_tasks.q      |  6 ++
 .../spark/spark_stage_max_tasks.q.out           | 77 ++++++++++++++++++++
 8 files changed, 115 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/363ffe0a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 176d36f..fce8db3 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3371,6 +3371,8 @@ public class HiveConf extends Configuration {
             "Turn this off when there is a memory issue."),
     SPARK_JOB_MAX_TASKS("hive.spark.job.max.tasks", -1, "The maximum number of tasks a Spark
job may have.\n" +
             "If a Spark job contains more tasks than the maximum, it will be cancelled. A
value of -1 means no limit."),
+    SPARK_STAGE_MAX_TASKS("hive.spark.stage.max.tasks", -1, "The maximum number of tasks
a stage in a Spark job may have.\n" +
+        "If a Spark job stage contains more tasks than the maximum, the job will be cancelled.
A value of -1 means no limit."),
     NWAYJOINREORDER("hive.reorder.nway.joins", true,
       "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
     HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/363ffe0a/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e613374..62462bd 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1459,5 +1459,6 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\
   groupby3_map_skew_multi_distinct.q,\
   groupby3_multi_distinct.q,\
   groupby_grouping_sets7.q,\
-  spark_job_max_tasks.q
+  spark_job_max_tasks.q,\
+  spark_stage_max_tasks.q
 

http://git-wip-us.apache.org/repos/asf/hive/blob/363ffe0a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
index 1457db0..27b87fb 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
@@ -288,6 +288,7 @@ public class CliConfigs {
         excludesFrom(testConfigProps, "minimr.query.negative.files");
         excludeQuery("authorization_uri_import.q");
         excludeQuery("spark_job_max_tasks.q");
+        excludeQuery("spark_stage_max_tasks.q");
 
         setResultsDir("ql/src/test/results/clientnegative");
         setLogDir("itests/qtest/target/qfile-results/clientnegative");

http://git-wip-us.apache.org/repos/asf/hive/blob/363ffe0a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index b4fb49f..2ee8c93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -138,7 +138,7 @@ public class SparkTask extends Task<SparkWork> {
         LOG.info("Failed to submit Spark job " + sparkJobID);
         killJob();
       } else if (rc == 4) {
-        LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS)
+
+        LOG.info("The spark job or one stage of it has too many tasks" +
             ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
         killJob();
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/363ffe0a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 9dfb65e..37b8363 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -35,7 +35,9 @@ import org.apache.spark.JobExecutionStatus;
  */
 public class RemoteSparkJobMonitor extends SparkJobMonitor {
   private int sparkJobMaxTaskCount = -1;
+  private int sparkStageMaxTaskCount = -1;
   private int totalTaskCount = 0;
+  private int stageMaxTaskCount = 0;
   private RemoteSparkJobStatus sparkJobStatus;
   private final HiveConf hiveConf;
 
@@ -44,6 +46,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
     this.sparkJobStatus = sparkJobStatus;
     this.hiveConf = hiveConf;
     sparkJobMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS);
+    sparkStageMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_STAGE_MAX_TASKS);
   }
 
   @Override
@@ -103,6 +106,17 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
                 console.logInfo(format);
               }
             } else {
+              // Get the maximum of the number of tasks in the stages of the job and cancel
the job if it goes beyond the limit.
+              if (sparkStageMaxTaskCount != -1 && stageMaxTaskCount == 0) {
+                stageMaxTaskCount = getStageMaxTaskCount(progressMap);
+                if (stageMaxTaskCount > sparkStageMaxTaskCount) {
+                  rc = 4;
+                  done = true;
+                  console.printInfo("\nThe number of task in one stage of the Spark job ["
+ stageMaxTaskCount + "] is greater than the limit [" +
+                      sparkStageMaxTaskCount + "]. The Spark job will be cancelled.");
+                }
+              }
+
               // Count the number of tasks, and kill application if it goes beyond the limit.
               if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) {
                 totalTaskCount = getTotalTaskCount(progressMap);

http://git-wip-us.apache.org/repos/asf/hive/blob/363ffe0a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index 41730b5..078a57d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -181,6 +181,18 @@ abstract class SparkJobMonitor {
     return totalTasks;
   }
 
+  protected int getStageMaxTaskCount(Map<String, SparkStageProgress> progressMap) {
+    int stageMaxTasks = 0;
+    for (SparkStageProgress progress: progressMap.values() ) {
+      int tasks = progress.getTotalTaskCount();
+      if (tasks > stageMaxTasks) {
+        stageMaxTasks = tasks;
+      }
+    }
+
+    return stageMaxTasks;
+  }
+
   private String getReport(Map<String, SparkStageProgress> progressMap) {
     StringBuilder reportBuffer = new StringBuilder();
     SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");

http://git-wip-us.apache.org/repos/asf/hive/blob/363ffe0a/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q b/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
new file mode 100644
index 0000000..5bdb014
--- /dev/null
+++ b/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
@@ -0,0 +1,6 @@
+set hive.spark.stage.max.tasks=1;
+
+EXPLAIN
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;

http://git-wip-us.apache.org/repos/asf/hive/blob/363ffe0a/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out b/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
new file mode 100644
index 0000000..ba2f09e
--- /dev/null
+++ b/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
@@ -0,0 +1,77 @@
+PREHOOK: query: EXPLAIN
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 3 <- Reducer 2 (SORT, 1)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src1
+                  Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats:
NONE
+                  Select Operator
+                    expressions: key (type: string), value (type: string)
+                    outputColumnNames: key, value
+                    Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column
stats: NONE
+                    Group By Operator
+                      aggregations: sum(value)
+                      keys: key (type: string)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column
stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column
stats: NONE
+                        value expressions: _col1 (type: double)
+        Reducer 2 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats:
NONE
+                Reduce Output Operator
+                  key expressions: _col1 (type: double)
+                  sort order: +
+                  Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats:
NONE
+                  value expressions: _col0 (type: string)
+        Reducer 3 
+            Reduce Operator Tree:
+              Select Operator
+                expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats:
NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats:
NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src1
+#### A masked pattern was here ####
+FAILED: Execution Error, return code 4 from org.apache.hadoop.hive.ql.exec.spark.SparkTask


Mime
View raw message