hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kzsom...@apache.org
Subject hive git commit: HIVE-17078: Add more logs to MapredLocalTask (Yibing Shi via Barna Zsombor Klara)
Date Fri, 06 Oct 2017 13:36:08 GMT
Repository: hive
Updated Branches:
  refs/heads/master 593ca11e4 -> c1f3d9a48


HIVE-17078: Add more logs to MapredLocalTask (Yibing Shi via Barna Zsombor Klara)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c1f3d9a4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c1f3d9a4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c1f3d9a4

Branch: refs/heads/master
Commit: c1f3d9a48b6122b3b5e5bf03ec1e524c5102f3b2
Parents: 593ca11
Author: Barna Zsombor Klara <zsombor.klara@cloudera.com>
Authored: Fri Oct 6 15:29:30 2017 +0200
Committer: Barna Zsombor Klara <zsombor.klara@cloudera.com>
Committed: Fri Oct 6 15:29:30 2017 +0200

----------------------------------------------------------------------
 .../hadoop/hive/common/log/LogRedirector.java   | 99 ++++++++++++++++++++
 .../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 24 ++++-
 .../results/clientpositive/auto_join25.q.out    |  4 +
 .../auto_join_without_localtask.q.out           |  2 +
 .../bucketsortoptimize_insert_8.q.out           |  2 +
 .../infer_bucket_sort_convert_join.q.out        |  1 +
 .../results/clientpositive/mapjoin_hook.q.out   |  3 +
 .../hive/spark/client/SparkClientImpl.java      | 63 ++-----------
 8 files changed, 138 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java b/common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java
new file mode 100644
index 0000000..c0650ed
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.log;
+
+import org.slf4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import java.util.List;
+
+/**
+ * Class used to redirect output read from a stream to a logger
+ */
+public class LogRedirector implements Runnable {
+
+  private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000;
+
+  public interface LogSourceCallback {
+    boolean isAlive();
+  }
+
+  private final Logger logger;
+  private final BufferedReader in;
+  private final LogSourceCallback callback;
+  private List<String> errLogs;
+  private int numErrLogLines = 0;
+
+  public LogRedirector(InputStream in, Logger logger, LogSourceCallback callback) {
+    this.in = new BufferedReader(new InputStreamReader(in));
+    this.callback = callback;
+    this.logger = logger;
+  }
+
+  public LogRedirector(InputStream in, Logger logger, List<String> errLogs,
+                       LogSourceCallback callback) {
+    this.in = new BufferedReader(new InputStreamReader(in));
+    this.errLogs = errLogs;
+    this.callback = callback;
+    this.logger = logger;
+  }
+
+  @Override
+  public void run() {
+    try {
+      String line = null;
+      while ((line = in.readLine()) != null) {
+        logger.info(line);
+        if (errLogs != null) {
+          if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) {
+            errLogs.add(line);
+          }
+        }
+      }
+    } catch (IOException e) {
+      if (callback.isAlive()) {
+        logger.warn("I/O error in redirector thread.", e);
+      } else {
+        // When stopping the process we are redirecting from,
+        // the streams might be closed during reading.
+        // We should not log the related exceptions in a visible level
+        // as they might mislead the user.
+        logger.debug("I/O error in redirector thread while stopping the remote driver", e);
+      }
+    } catch (Exception e) {
+      logger.warn("Error in redirector thread.", e);
+    }
+  }
+
+  /**
+   * Start the logredirector in a new thread
+   * @param name name of the new thread
+   * @param redirector redirector to start
+   */
+  public static void redirect(String name, LogRedirector redirector) {
+    Thread thread = new Thread(redirector);
+    thread.setName(name);
+    thread.setDaemon(true);
+    thread.start();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 9dfefee..b6a988d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.common.io.CachingPrintStream;
+import org.apache.hadoop.hive.common.log.LogRedirector;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -79,6 +80,7 @@ import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.StreamPrinter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -325,6 +327,15 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements
Serializab
       // Run ExecDriver in another JVM
       executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
 
+      final LogRedirector.LogSourceCallback callback = () -> {return executor.isAlive();};
+
+      LogRedirector.redirect(
+          Thread.currentThread().getName() + "-LocalTask-" + getName() + "-stdout",
+          new LogRedirector(executor.getInputStream(), LOG, callback));
+      LogRedirector.redirect(
+          Thread.currentThread().getName() + "-LocalTask-" + getName() + "-stderr",
+          new LogRedirector(executor.getErrorStream(), LOG, callback));
+
       CachingPrintStream errPrintStream = new CachingPrintStream(System.err);
 
       StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
@@ -384,14 +395,19 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements
Serializab
       console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: "
           + Utilities.showTime(elapsed) + " sec.");
     } catch (Throwable throwable) {
+      int retVal;
+      String message;
       if (throwable instanceof OutOfMemoryError
           || (throwable instanceof MapJoinMemoryExhaustionError)) {
-        l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable);
-        return 3;
+        message = "Hive Runtime Error: Map local work exhausted memory";
+        retVal = 3;
       } else {
-        l4j.error("Hive Runtime Error: Map local work failed", throwable);
-        return 2;
+        message = "Hive Runtime Error: Map local work failed";
+        retVal = 2;
       }
+      l4j.error(message, throwable);
+      console.printError(message, HiveStringUtils.stringifyException(throwable));
+      return retVal;
     }
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/test/results/clientpositive/auto_join25.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/auto_join25.q.out b/ql/src/test/results/clientpositive/auto_join25.q.out
index 534bdb6..d24e0c3 100644
--- a/ql/src/test/results/clientpositive/auto_join25.q.out
+++ b/ql/src/test/results/clientpositive/auto_join25.q.out
@@ -18,6 +18,7 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
 PREHOOK: Output: default@dest1
+Hive Runtime Error: Map local work exhausted memory
 FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 POSTHOOK: query: FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key)
@@ -62,8 +63,10 @@ INSERT OVERWRITE TABLE dest_j2 SELECT src1.key, src3.value
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 PREHOOK: Output: default@dest_j2
+Hive Runtime Error: Map local work exhausted memory
 FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
+Hive Runtime Error: Map local work exhausted memory
 FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key
+ src2.key = src3.key)
@@ -105,6 +108,7 @@ INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 PREHOOK: Output: default@dest_j1
+Hive Runtime Error: Map local work exhausted memory
 FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key)

http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/test/results/clientpositive/auto_join_without_localtask.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/auto_join_without_localtask.q.out b/ql/src/test/results/clientpositive/auto_join_without_localtask.q.out
index 57f0067..a8ae000 100644
--- a/ql/src/test/results/clientpositive/auto_join_without_localtask.q.out
+++ b/ql/src/test/results/clientpositive/auto_join_without_localtask.q.out
@@ -1045,8 +1045,10 @@ PREHOOK: query: select a.* from src a join src b on a.key=b.key join
src c on a.
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 #### A masked pattern was here ####
+Hive Runtime Error: Map local work exhausted memory
 FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
+Hive Runtime Error: Map local work exhausted memory
 FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 POSTHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value
where a.key>100 order by a.key, a.value limit 40

http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out b/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out
index f0e77f0..1b3d741 100644
--- a/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out
+++ b/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out
@@ -273,6 +273,7 @@ PREHOOK: Input: default@test_table1@ds=1
 PREHOOK: Input: default@test_table2
 PREHOOK: Input: default@test_table2@ds=1
 PREHOOK: Output: default@test_table3@ds=1
+Hive Runtime Error: Map local work failed
 FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 POSTHOOK: query: INSERT OVERWRITE TABLE test_table3 PARTITION (ds = '1')
@@ -551,6 +552,7 @@ PREHOOK: Input: default@test_table1@ds=1
 PREHOOK: Input: default@test_table2
 PREHOOK: Input: default@test_table2@ds=1
 PREHOOK: Output: default@test_table3@ds=1
+Hive Runtime Error: Map local work failed
 FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 POSTHOOK: query: INSERT OVERWRITE TABLE test_table3 PARTITION (ds = '1')

http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out b/ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out
index 70a57d5..09ab2d9 100644
--- a/ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out
+++ b/ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out
@@ -60,6 +60,7 @@ SELECT a.key, b.value FROM src a JOIN src b ON a.key = b.key
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 PREHOOK: Output: default@test_table@part=1
+Hive Runtime Error: Map local work exhausted memory
 FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 POSTHOOK: query: INSERT OVERWRITE TABLE test_table PARTITION (part = '1') 

http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/ql/src/test/results/clientpositive/mapjoin_hook.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/mapjoin_hook.q.out b/ql/src/test/results/clientpositive/mapjoin_hook.q.out
index a9f9be3..f80a26a 100644
--- a/ql/src/test/results/clientpositive/mapjoin_hook.q.out
+++ b/ql/src/test/results/clientpositive/mapjoin_hook.q.out
@@ -38,6 +38,7 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
 PREHOOK: Output: default@dest1
+Hive Runtime Error: Map local work exhausted memory
 FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 [MapJoinCounter PostHook] COMMON_JOIN: 0 HINTED_MAPJOIN: 0 HINTED_MAPJOIN_LOCAL: 0 CONVERTED_MAPJOIN:
0 CONVERTED_MAPJOIN_LOCAL: 1 BACKUP_COMMON_JOIN: 1
@@ -51,8 +52,10 @@ INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 PREHOOK: Output: default@dest1
+Hive Runtime Error: Map local work exhausted memory
 FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
+Hive Runtime Error: Map local work exhausted memory
 FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
 ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 [MapJoinCounter PostHook] COMMON_JOIN: 0 HINTED_MAPJOIN: 0 HINTED_MAPJOIN_LOCAL: 0 CONVERTED_MAPJOIN:
0 CONVERTED_MAPJOIN_LOCAL: 2 BACKUP_COMMON_JOIN: 2

http://git-wip-us.apache.org/repos/asf/hive/blob/c1f3d9a4/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 03e773a..e0ec3b7 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -32,13 +32,10 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
 
-import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.io.Writer;
@@ -53,9 +50,9 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.log.LogRedirector;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -75,7 +72,6 @@ class SparkClientImpl implements SparkClient {
   private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class);
 
   private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds
-  private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000;
 
   private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
   private static final String SPARK_HOME_ENV = "SPARK_HOME";
@@ -490,8 +486,12 @@ class SparkClientImpl implements SparkClient {
       final Process child = pb.start();
       String threadName = Thread.currentThread().getName();
       final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
-      redirect("RemoteDriver-stdout-redir-" + threadName, new Redirector(child.getInputStream()));
-      redirect("RemoteDriver-stderr-redir-" + threadName, new Redirector(child.getErrorStream(),
childErrorLog));
+      final LogRedirector.LogSourceCallback callback = () -> {return isAlive;};
+
+      LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName,
+          new LogRedirector(child.getInputStream(), LOG, callback));
+      LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName,
+          new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
 
       runnable = new Runnable() {
         @Override
@@ -542,13 +542,6 @@ class SparkClientImpl implements SparkClient {
     return null;
   }
 
-  private void redirect(String name, Redirector redirector) {
-    Thread thread = new Thread(redirector);
-    thread.setName(name);
-    thread.setDaemon(true);
-    thread.start();
-  }
-
   private class ClientProtocol extends BaseProtocol {
 
     <T extends Serializable> JobHandleImpl<T> submit(Job<T> job, List<JobHandle.Listener<T>>
listeners) {
@@ -653,48 +646,6 @@ class SparkClientImpl implements SparkClient {
 
   }
 
-  private class Redirector implements Runnable {
-
-    private final BufferedReader in;
-    private List<String> errLogs;
-    private int numErrLogLines = 0;
-
-    Redirector(InputStream in) {
-      this.in = new BufferedReader(new InputStreamReader(in));
-    }
-
-    Redirector(InputStream in, List<String> errLogs) {
-      this.in = new BufferedReader(new InputStreamReader(in));
-      this.errLogs = errLogs;
-    }
-
-    @Override
-    public void run() {
-      try {
-        String line = null;
-        while ((line = in.readLine()) != null) {
-          LOG.info(line);
-          if (errLogs != null) {
-            if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) {
-              errLogs.add(line);
-            }
-          }
-        }
-      } catch (IOException e) {
-        if (isAlive) {
-          LOG.warn("I/O error in redirector thread.", e);
-        } else {
-          // When stopping the remote driver the process might be destroyed during reading
from the stream.
-          // We should not log the related exceptions in a visible level as they might mislead
the user.
-          LOG.debug("I/O error in redirector thread while stopping the remote driver", e);
-        }
-      } catch (Exception e) {
-        LOG.warn("Error in redirector thread.", e);
-      }
-    }
-
-  }
-
   private static class AddJarJob implements Job<Serializable> {
     private static final long serialVersionUID = 1L;
 


Mime
View raw message