hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sunc...@apache.org
Subject hive git commit: HIVE-18283: Better error message and error code for HoS exceptions (Chao Sun, reviewed by Xuefu Zhang and Andrew Sherman)
Date Wed, 20 Dec 2017 00:28:26 GMT
Repository: hive
Updated Branches:
  refs/heads/master 00212e030 -> 14df3b021


HIVE-18283: Better error message and error code for HoS exceptions (Chao Sun, reviewed by
Xuefu Zhang and Andrew Sherman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/14df3b02
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/14df3b02
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/14df3b02

Branch: refs/heads/master
Commit: 14df3b0212306a0a2d60176c26f710378037a5a1
Parents: 00212e0
Author: Chao Sun <sunchao@apache.org>
Authored: Tue Dec 19 16:27:40 2017 -0800
Committer: Chao Sun <sunchao@apache.org>
Committed: Tue Dec 19 16:27:40 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     | 41 ++++++++-
 .../hadoop/hive/ql/exec/spark/SparkTask.java    | 31 ++++++-
 .../ql/exec/spark/session/SparkSessionImpl.java | 94 +++++++++++++++++++-
 .../spark/status/RemoteSparkJobMonitor.java     | 16 ++--
 .../spark/status/impl/RemoteSparkJobStatus.java |  4 +-
 .../session/TestSparkSessionManagerImpl.java    | 71 +++++++++++++++
 6 files changed, 244 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/14df3b02/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 6b949d2..476c261 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -29,6 +29,7 @@ import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.ASTNodeOrigin;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
@@ -464,7 +465,9 @@ public enum ErrorMsg {
   HIVE_GROUPING_FUNCTION_EXPR_NOT_IN_GROUPBY(10409, "Expression in GROUPING function not
present in GROUP BY"),
   ALTER_TABLE_NON_PARTITIONED_TABLE_CASCADE_NOT_SUPPORTED(10410,
       "Alter table with non-partitioned table does not support cascade"),
+
   //========================== 20000 range starts here ========================//
+
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script.
"
       + "It may have crashed with an error."),
@@ -492,10 +495,15 @@ public enum ErrorMsg {
   FILE_NOT_FOUND(20012, "File not found: {0}", "64000", true),
   WRONG_FILE_FORMAT(20013, "Wrong file format. Please check the file's format.", "64000",
true),
 
+  SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark job is submitted to an invalid queue: {0}."
+      + " Please fix and try again.", true),
+  SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."),
+
   // An exception from runtime that will show the full stack to client
   UNRESOLVED_RT_EXCEPTION(29999, "Runtime Error: {0}", "58004", true),
 
   //========================== 30000 range starts here ========================//
+
   STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " +
     "There was a error to retrieve the StatsPublisher, and retrying " +
     "might help. If you dont want the query to fail because accurate statistics " +
@@ -535,7 +543,6 @@ public enum ErrorMsg {
       "to fail because of this, set hive.stats.atomic=false", true),
   STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true),
 
-
   INVALID_FILE_FORMAT_IN_LOAD(30019, "The file that you are trying to load does not match
the" +
       " file format of the destination table."),
 
@@ -555,7 +562,37 @@ public enum ErrorMsg {
   CONCATENATE_UNSUPPORTED_PARTITION_ARCHIVED(30032, "Concatenate/Merge can not be performed
on archived partitions"),
   CONCATENATE_UNSUPPORTED_TABLE_NON_NATIVE(30033, "Concatenate/Merge can not be performed
on non-native tables"),
   CONCATENATE_UNSUPPORTED_TABLE_NOT_MANAGED(30034, "Concatenate/Merge can only be performed
on managed tables"),
-  CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL(30035, "Concatenate/Merge can not be performed
on transactional tables")
+  CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL(30035,
+      "Concatenate/Merge can not be performed on transactional tables"),
+
+  SPARK_GET_JOB_INFO_TIMEOUT(30036,
+      "Spark job timed out after {0} seconds while getting job info", true),
+  SPARK_JOB_MONITOR_TIMEOUT(30037, "Job hasn't been submitted after {0}s." +
+      " Aborting it.\nPossible reasons include network issues, " +
+      "errors in remote driver or the cluster has no available resources, etc.\n" +
+      "Please check YARN or Spark driver's logs for further information.\n" +
+      "The timeout is controlled by " + HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT + ".",
true),
+
+  // Various errors when creating Spark client
+  SPARK_CREATE_CLIENT_TIMEOUT(30038,
+      "Timed out while creating Spark client for session {0}.", true),
+  SPARK_CREATE_CLIENT_QUEUE_FULL(30039,
+      "Failed to create Spark client because job queue is full: {0}.", true),
+  SPARK_CREATE_CLIENT_INTERRUPTED(30040,
+      "Interrupted while creating Spark client for session {0}", true),
+  SPARK_CREATE_CLIENT_ERROR(30041,
+      "Failed to create Spark client for Spark session {0}", true),
+  SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST(30042,
+      "Failed to create Spark client due to invalid resource request: {0}", true),
+  SPARK_CREATE_CLIENT_CLOSED_SESSION(30043,
+      "Cannot create Spark client on a closed session {0}", true),
+
+  SPARK_JOB_INTERRUPTED(30044, "Spark job was interrupted while executing"),
+
+  //========================== 40000 range starts here ========================//
+
+  SPARK_JOB_RUNTIME_ERROR(40001,
+      "Spark job failed during runtime. Please check stacktrace for the root cause.")
   ;
 
   private int errorCode;

http://git-wip-us.apache.org/repos/asf/hive/blob/14df3b02/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 0f5f708..6915cf1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Throwables;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.slf4j.Logger;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -67,6 +69,7 @@ import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.collect.Lists;
+import org.apache.spark.SparkException;
 
 public class SparkTask extends Task<SparkWork> {
   private static final String CLASS_NAME = SparkTask.class.getName();
@@ -155,7 +158,12 @@ public class SparkTask extends Task<SparkWork> {
       console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
       LOG.error(msg, e);
       setException(e);
-      rc = 1;
+      if (e instanceof HiveException) {
+        HiveException he = (HiveException) e;
+        rc = he.getCanonicalErrorMsg().getErrorCode();
+      } else {
+        rc = 1;
+      }
     } finally {
       startTime = perfLogger.getEndTime(PerfLogger.SPARK_SUBMIT_TO_RUNNING);
       // The startTime may not be set if the sparkTask finished too fast,
@@ -417,11 +425,30 @@ public class SparkTask extends Task<SparkWork> {
               error.getCause() instanceof InterruptedException)) {
             killJob();
           }
-          setException(error);
+          HiveException he;
+          if (isOOMError(error)) {
+            he = new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM);
+          } else {
+            he = new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR);
+          }
+          setException(he);
         }
       }
     } catch (Exception e) {
       LOG.error("Failed to get Spark job information", e);
     }
   }
+
+  private boolean isOOMError(Throwable error) {
+    while (error != null) {
+      if (error instanceof OutOfMemoryError) {
+        return true;
+      } else if (error instanceof SparkException) {
+        String sts = Throwables.getStackTraceAsString(error);
+        return sts.contains("Container killed by YARN for exceeding memory limits");
+      }
+      error = error.getCause();
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/14df3b02/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index ba61868..0c61566 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -18,12 +18,20 @@
 package org.apache.hadoop.hive.ql.exec.spark.session;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,15 +52,30 @@ public class SparkSessionImpl implements SparkSession {
   private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class);
   private static final String SPARK_DIR = "_spark_session_dir";
 
+  /** Regex for different Spark session error messages */
+  private static final String AM_TIMEOUT_ERR = ".*ApplicationMaster for attempt.*timed out.*";
+  private static final String UNKNOWN_QUEUE_ERR = "(submitted by user.*to unknown queue:.*)\n";
+  private static final String STOPPED_QUEUE_ERR = "(Queue.*is STOPPED)";
+  private static final String FULL_QUEUE_ERR = "(Queue.*already has.*applications)";
+  private static final String INVALILD_MEM_ERR =
+      "(Required executor memory.*is above the max threshold.*) of this";
+  private static final String INVALID_CORE_ERR =
+      "(initial executor number.*must between min executor.*and max executor number.*)\n";
+
+  /** Pre-compiled error patterns. Shared between all Spark sessions */
+  private static Map<String, Pattern> errorPatterns;
+
   private HiveConf conf;
   private boolean isOpen;
   private final String sessionId;
   private HiveSparkClient hiveSparkClient;
   private Path scratchDir;
   private final Object dirLock = new Object();
+  private String matchedString = null;
 
   public SparkSessionImpl() {
     sessionId = makeSessionId();
+    initErrorPatterns();
   }
 
   @Override
@@ -64,9 +87,13 @@ public class SparkSessionImpl implements SparkSession {
       hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId);
     } catch (Throwable e) {
       // It's possible that user session is closed while creating Spark client.
-      String msg = isOpen ? "Failed to create Spark client for Spark session " + sessionId
:
-        "Spark Session " + sessionId + " is closed before Spark client is created";
-      throw new HiveException(msg, e);
+      HiveException he;
+      if (isOpen) {
+        he = getHiveException(e);
+      } else {
+        he = new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_CLOSED_SESSION, sessionId);
+      }
+      throw he;
     }
     LOG.info("Spark session {} is successfully opened", sessionId);
   }
@@ -152,6 +179,67 @@ public class SparkSessionImpl implements SparkSession {
     return sparkDir;
   }
 
+  private static void initErrorPatterns() {
+    errorPatterns = Maps.newHashMap(
+        new ImmutableMap.Builder<String, Pattern>()
+            .put(AM_TIMEOUT_ERR, Pattern.compile(AM_TIMEOUT_ERR))
+            .put(UNKNOWN_QUEUE_ERR, Pattern.compile(UNKNOWN_QUEUE_ERR))
+            .put(STOPPED_QUEUE_ERR, Pattern.compile(STOPPED_QUEUE_ERR))
+            .put(FULL_QUEUE_ERR, Pattern.compile(FULL_QUEUE_ERR))
+            .put(INVALILD_MEM_ERR, Pattern.compile(INVALILD_MEM_ERR))
+            .put(INVALID_CORE_ERR, Pattern.compile(INVALID_CORE_ERR))
+            .build()
+    );
+  }
+
+  @VisibleForTesting
+  HiveException getHiveException(Throwable e) {
+    Throwable oe = e;
+    while (e != null) {
+      if (e instanceof TimeoutException) {
+        return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
+      } else if (e instanceof InterruptedException) {
+        return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INTERRUPTED, sessionId);
+      } else if (e instanceof RuntimeException) {
+        String sts = Throwables.getStackTraceAsString(e);
+        if (matches(sts, AM_TIMEOUT_ERR)) {
+          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
+        } else if (matches(sts, UNKNOWN_QUEUE_ERR) || matches(sts, STOPPED_QUEUE_ERR)) {
+          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, matchedString);
+        } else if (matches(sts, FULL_QUEUE_ERR)) {
+          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, matchedString);
+        } else if (matches(sts, INVALILD_MEM_ERR) || matches(sts, INVALID_CORE_ERR)) {
+          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST,
+              matchedString);
+        } else {
+          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId);
+        }
+      }
+      e = e.getCause();
+    }
+
+    return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId);
+  }
+
+  @VisibleForTesting
+  String getMatchedString() {
+    return matchedString;
+  }
+
+  private boolean matches(String input, String regex) {
+    if (!errorPatterns.containsKey(regex)) {
+      LOG.warn("No error pattern found for regex: {}", regex);
+      return false;
+    }
+    Pattern p = errorPatterns.get(regex);
+    Matcher m = p.matcher(input);
+    boolean result = m.find();
+    if (result && m.groupCount() == 1) {
+      this.matchedString = m.group(1);
+    }
+    return result;
+  }
+
   private void cleanScratchDir() throws IOException {
     if (scratchDir != null) {
       FileSystem fs = scratchDir.getFileSystem(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/14df3b02/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 37b8363..f94ad0c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -23,9 +23,11 @@ import java.util.Map;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hive.spark.client.JobHandle;
 import org.apache.spark.JobExecutionStatus;
 
@@ -70,11 +72,11 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
         case QUEUED:
           long timeCount = (System.currentTimeMillis() - startTime) / 1000;
           if ((timeCount > monitorTimeoutInterval)) {
-            console.printError("Job hasn't been submitted after " + timeCount + "s." +
-                " Aborting it.\nPossible reasons include network issues, " +
-                "errors in remote driver or the cluster has no available resources, etc.\n"
+
-                "Please check YARN or Spark driver's logs for further information.");
+            HiveException he = new HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT,
+                Long.toString(timeCount));
+            console.printError(he.getMessage());
             console.printError("Status: " + state);
+            sparkJobStatus.setError(he);
             running = false;
             done = true;
             rc = 2;
@@ -181,6 +183,10 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
           Thread.sleep(checkInterval);
         }
       } catch (Exception e) {
+        Exception finalException = e;
+        if (e instanceof InterruptedException) {
+          finalException = new HiveException(e, ErrorMsg.SPARK_JOB_INTERRUPTED);
+        }
         String msg = " with exception '" + Utilities.getNameMessage(e) + "'";
         msg = "Failed to monitor Job[" + sparkJobStatus.getJobId() + "]" + msg;
 
@@ -190,7 +196,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
         console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
         rc = 1;
         done = true;
-        sparkJobStatus.setError(e);
+        sparkJobStatus.setError(finalException);
       } finally {
         if (done) {
           break;

http://git-wip-us.apache.org/repos/asf/hive/blob/14df3b02/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index 67db303..d93bd8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -174,7 +175,8 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
       return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
     } catch (Exception e) {
       LOG.warn("Failed to get job info.", e);
-      throw new HiveException(e);
+      throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_TIMEOUT,
+          Long.toString(sparkClientTimeoutInSeconds));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/14df3b02/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
index 47d2437..291ed85 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.session;
 
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -26,6 +27,7 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
 import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
@@ -129,6 +131,75 @@ public class TestSparkSessionManagerImpl {
     checkSparkConf(conf, sparkCloneConfiguration, "true");
   }
 
+  @Test
+  public void testGetHiveException() throws Exception {
+    HiveConf conf = new HiveConf();
+    conf.set("spark.master", "local");
+    SparkSessionManager ssm = SparkSessionManagerImpl.getInstance();
+    SparkSessionImpl ss = (SparkSessionImpl) ssm.getSession(
+        null, conf, true);
+
+    Throwable e;
+
+    e = new TimeoutException();
+    checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
+
+    e = new InterruptedException();
+    checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INTERRUPTED);
+
+    e = new RuntimeException("\t diagnostics: Application application_1508358311878_3322732
"
+        + "failed 1 times due to ApplicationMaster for attempt "
+        + "appattempt_1508358311878_3322732_000001 timed out. Failing the application.");
+    checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
+
+    e = new RuntimeException("\t diagnostics: Application application_1508358311878_3330000
"
+        + "submitted by user hive to unknown queue: foo");
+    checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE,
+        "submitted by user hive to unknown queue: foo");
+
+    e = new RuntimeException("\t diagnostics: org.apache.hadoop.security.AccessControlException:
"
+        + "Queue root.foo is STOPPED. Cannot accept submission of application: "
+        + "application_1508358311878_3369187");
+    checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE,
+        "Queue root.foo is STOPPED");
+
+    e = new RuntimeException("\t diagnostics: org.apache.hadoop.security.AccessControlException:
"
+        + "Queue root.foo already has 10 applications, cannot accept submission of application:
"
+        + "application_1508358311878_3384544");
+    checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL,
+        "Queue root.foo already has 10 applications");
+
+    e = new RuntimeException("Exception in thread \"\"main\"\" java.lang.IllegalArgumentException:
"
+        + "Required executor memory (7168+10240 MB) is above the max threshold (16384 MB)
of this "
+        + "cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or
"
+        + "'yarn.nodemanager.resource.memory-mb'.");
+    checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST,
+        "Required executor memory (7168+10240 MB) is above the max threshold (16384 MB)");
+
+    e = new RuntimeException("Exception in thread \"\"main\"\" java.lang.IllegalArgumentException:
"
+        + "requirement failed: initial executor number 5 must between min executor number10
"
+        + "and max executor number 50");
+    checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST,
+        "initial executor number 5 must between min executor number10 and max executor number
50");
+
+    // Other exceptions which defaults to SPARK_CREATE_CLIENT_ERROR
+    e = new Exception();
+    checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR);
+  }
+
+  private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg)
{
+    checkHiveException(ss, e, expectedErrMsg, null);
+  }
+
+  private void checkHiveException(SparkSessionImpl ss, Throwable e,
+      ErrorMsg expectedErrMsg, String expectedMatchedStr) {
+    HiveException he = ss.getHiveException(e);
+    assertEquals(expectedErrMsg, he.getCanonicalErrorMsg());
+    if (expectedMatchedStr != null) {
+      assertEquals(expectedMatchedStr, ss.getMatchedString());
+    }
+  }
+
   /**
    * Force a Spark config to be generated and check that a config value has the expected
value
    * @param conf the Hive config to use as a base


Mime
View raw message