hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [02/13] hive git commit: HIVE-15039: A better job monitor console output for HoS (Rui reviewed by Xuefu and Ferdinand)
Date Tue, 15 Nov 2016 03:31:25 GMT
HIVE-15039: A better job monitor console output for HoS (Rui reviewed by Xuefu and Ferdinand)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/345353c0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/345353c0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/345353c0

Branch: refs/heads/hive-14535
Commit: 345353c0ea5d3ddda9f6d89cbf8cd0e92726fcb6
Parents: 7c26391
Author: Rui Li <lirui@apache.org>
Authored: Thu Nov 3 10:45:24 2016 +0800
Committer: Rui Li <shlr@cn.ibm.com>
Committed: Thu Nov 3 10:45:24 2016 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../hadoop/hive/ql/exec/InPlaceUpdates.java     |  89 +++++++
 .../exec/spark/status/LocalSparkJobMonitor.java |   4 +-
 .../spark/status/RemoteSparkJobMonitor.java     |  27 ++-
 .../ql/exec/spark/status/SparkJobMonitor.java   | 233 +++++++++++++++++--
 .../hadoop/hive/ql/exec/tez/InPlaceUpdates.java |  82 -------
 .../hadoop/hive/ql/exec/tez/TezJobMonitor.java  |   1 +
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   2 +-
 8 files changed, 322 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/345353c0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 80cd5ad..d287b45 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2766,6 +2766,8 @@ public class HiveConf extends Configuration {
         "hive.tez.exec.inplace.progress",
         true,
         "Updates tez job execution progress in-place in the terminal."),
+    SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true,
+        "Updates spark job execution progress in-place in the terminal."),
     TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f,
         "This is to override the tez setting with the same name"),
     TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MIN("hive.tez.task.scale.memory.reserve-fraction.min",

http://git-wip-us.apache.org/repos/asf/hive/blob/345353c0/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
new file mode 100644
index 0000000..f59d8e2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import static org.fusesource.jansi.Ansi.ansi;
+import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
+import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
+import static org.fusesource.jansi.internal.CLibrary.isatty;
+
+import java.io.PrintStream;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.fusesource.jansi.Ansi;
+
+import jline.TerminalFactory;
+
+public class InPlaceUpdates {
+
+  public static final int MIN_TERMINAL_WIDTH = 94;
+
+  static boolean isUnixTerminal() {
+
+    String os = System.getProperty("os.name");
+    if (os.startsWith("Windows")) {
+      // we do not support Windows, we will revisit this if we really need it for windows.
+      return false;
+    }
+
+    // We must be on some unix variant..
+    // check if standard out is a terminal
+    try {
+      // isatty system call will return 1 if the file descriptor is terminal else 0
+      if (isatty(STDOUT_FILENO) == 0) {
+        return false;
+      }
+      if (isatty(STDERR_FILENO) == 0) {
+        return false;
+      }
+    } catch (NoClassDefFoundError ignore) {
+      // These errors happen if the JNI lib is not available for your platform.
+      return false;
+    } catch (UnsatisfiedLinkError ignore) {
+      // These errors happen if the JNI lib is not available for your platform.
+      return false;
+    }
+    return true;
+  }
+
+  public static boolean inPlaceEligible(HiveConf conf) {
+    String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+    boolean inPlaceUpdates = false;
+    if (engine.equals("tez")) {
+      inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
+    }
+    if (engine.equals("spark")) {
+      inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.SPARK_EXEC_INPLACE_PROGRESS);
+    }
+
+    // we need at least 80 chars wide terminal to display in-place updates properly
+    return inPlaceUpdates && !SessionState.getConsole().getIsSilent() &&
isUnixTerminal()
+        && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH;
+  }
+
+  public static void reprintLine(PrintStream out, String line) {
+    out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+    out.flush();
+  }
+
+  public static void rePositionCursor(PrintStream ps) {
+    ps.print(ansi().cursorUp(0).toString());
+    ps.flush();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/345353c0/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
index 5f0352a..b6d128b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
@@ -47,7 +47,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
 
-    long startTime = System.currentTimeMillis();
+    startTime = System.currentTimeMillis();
 
     while (true) {
       try {
@@ -58,7 +58,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor {
 
         if (state == null) {
           long timeCount = (System.currentTimeMillis() - startTime)/1000;
-          if (timeCount > monitorTimeoutInteval) {
+          if (timeCount > monitorTimeoutInterval) {
             console.printError("Job hasn't been submitted after " + timeCount + "s. Aborting
it.");
             console.printError("Status: " + state);
             running = false;

http://git-wip-us.apache.org/repos/asf/hive/blob/345353c0/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 11f263b..bdb1527 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.spark.status;
 
+import java.util.Arrays;
 import java.util.Map;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -52,20 +53,17 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
 
-    long startTime = System.currentTimeMillis();
+    startTime = System.currentTimeMillis();
 
     while (true) {
       try {
         JobHandle.State state = sparkJobStatus.getRemoteJobState();
-        if (LOG.isDebugEnabled()) {
-          console.printInfo("state = " + state);
-        }
 
         switch (state) {
         case SENT:
         case QUEUED:
           long timeCount = (System.currentTimeMillis() - startTime) / 1000;
-          if ((timeCount > monitorTimeoutInteval)) {
+          if ((timeCount > monitorTimeoutInterval)) {
             console.printError("Job hasn't been submitted after " + timeCount + "s." +
                 " Aborting it.\nPossible reasons include network issues, " +
                 "errors in remote driver or the cluster has no available resources, etc.\n"
+
@@ -75,6 +73,9 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
             done = true;
             rc = 2;
           }
+          if (LOG.isDebugEnabled()) {
+            console.printInfo("state = " + state);
+          }
           break;
         case STARTED:
           JobExecutionStatus sparkJobState = sparkJobStatus.getState();
@@ -84,18 +85,20 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
               perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
               printAppInfo();
               // print job stages.
-              console.printInfo("\nQuery Hive on Spark job["
-                + sparkJobStatus.getJobId() + "] stages:");
-              for (int stageId : sparkJobStatus.getStageIds()) {
-                console.printInfo(Integer.toString(stageId));
-              }
+              console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId()
+
+                  "] stages: " + Arrays.toString(sparkJobStatus.getStageIds()));
 
               console.printInfo("\nStatus: Running (Hive on Spark job["
                 + sparkJobStatus.getJobId() + "])");
               running = true;
 
-              console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId:
"
-                + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
[StageCost]");
+              String format = "Job Progress Format\nCurrentTime StageId_StageAttemptId: "
+                  + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount";
+              if (!inPlaceUpdate) {
+                console.printInfo(format);
+              } else {
+                console.logInfo(format);
+              }
             }
 
             printStatus(progressMap, lastProgressMap);

http://git-wip-us.apache.org/repos/asf/hive/blob/345353c0/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index 0b6b15b..d5b9b5d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -19,11 +19,16 @@
 package org.apache.hadoop.hive.ql.exec.spark.status;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.fusesource.jansi.Ansi;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashSet;
@@ -33,35 +38,142 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
+import static org.fusesource.jansi.Ansi.ansi;
+
 abstract class SparkJobMonitor {
 
   protected static final String CLASS_NAME = SparkJobMonitor.class.getName();
   protected static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-  protected static SessionState.LogHelper console = new SessionState.LogHelper(LOG);
+  protected transient final SessionState.LogHelper console;
   protected final PerfLogger perfLogger = SessionState.getPerfLogger();
   protected final int checkInterval = 1000;
-  protected final long monitorTimeoutInteval;
+  protected final long monitorTimeoutInterval;
 
   private final Set<String> completed = new HashSet<String>();
   private final int printInterval = 3000;
   private long lastPrintTime;
 
+  protected long startTime;
+
+  protected enum StageState {
+    PENDING,
+    RUNNING,
+    FINISHED
+  }
+
+  // in-place progress update related variables
+  protected final boolean inPlaceUpdate;
+  private int lines = 0;
+  private final PrintStream out;
+
+
+  private static final int COLUMN_1_WIDTH = 16;
+  private static final String HEADER_FORMAT = "%16s%10s %13s  %5s  %9s  %7s  %7s  %6s  ";
+  private static final String STAGE_FORMAT = "%-16s%10s %13s  %5s  %9s  %7s  %7s  %6s  ";
+  private static final String HEADER = String.format(HEADER_FORMAT,
+      "STAGES", "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED");
+  private static final int SEPARATOR_WIDTH = 86;
+  private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0",
"-");
+  private static final String FOOTER_FORMAT = "%-15s  %-30s %-4s  %-25s";
+  private static final int progressBarChars = 30;
+
+  private final NumberFormat secondsFormat = new DecimalFormat("#0.00");
+
   protected SparkJobMonitor(HiveConf hiveConf) {
-    monitorTimeoutInteval = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT,
TimeUnit.SECONDS);
+    monitorTimeoutInterval = hiveConf.getTimeVar(
+        HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
+    inPlaceUpdate = InPlaceUpdates.inPlaceEligible(hiveConf);
+    console = SessionState.getConsole();
+    out = SessionState.LogHelper.getInfoStream();
   }
 
   public abstract int startMonitor();
 
+  private void printStatusInPlace(Map<String, SparkStageProgress> progressMap) {
+
+    StringBuilder reportBuffer = new StringBuilder();
+
+    // Num of total and completed tasks
+    int sumTotal = 0;
+    int sumComplete = 0;
+
+    // position the cursor to line 0
+    repositionCursor();
+
+    // header
+    reprintLine(SEPARATOR);
+    reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN);
+    reprintLine(SEPARATOR);
+
+    SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+    int idx = 0;
+    final int numKey = keys.size();
+    for (String s : keys) {
+      SparkStageProgress progress = progressMap.get(s);
+      final int complete = progress.getSucceededTaskCount();
+      final int total = progress.getTotalTaskCount();
+      final int running = progress.getRunningTaskCount();
+      final int failed = progress.getFailedTaskCount();
+      sumTotal += total;
+      sumComplete += complete;
+
+      StageState state = total > 0 ? StageState.PENDING : StageState.FINISHED;
+      if (complete > 0 || running > 0 || failed > 0) {
+        if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
+          perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
+        }
+        if (complete < total) {
+          state = StageState.RUNNING;
+        } else {
+          state = StageState.FINISHED;
+          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
+          completed.add(s);
+        }
+      }
+
+      int div = s.indexOf('_');
+      String attempt = div > 0 ? s.substring(div + 1) : "-";
+      String stageName = "Stage-" + (div > 0 ? s.substring(0, div) : s);
+      String nameWithProgress = getNameWithProgress(stageName, complete, total);
+
+      final int pending = total - complete - running;
+      String stageStr = String.format(STAGE_FORMAT,
+          nameWithProgress, attempt, state, total, complete, running, pending, failed);
+      reportBuffer.append(stageStr);
+      if (idx++ != numKey - 1) {
+        reportBuffer.append("\n");
+      }
+    }
+    reprintMultiLine(reportBuffer.toString());
+    reprintLine(SEPARATOR);
+    final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal;
+    String footer = getFooter(numKey, completed.size(), progress, startTime);
+    reprintLineWithColorAsBold(footer, Ansi.Color.RED);
+    reprintLine(SEPARATOR);
+  }
+
   protected void printStatus(Map<String, SparkStageProgress> progressMap,
-    Map<String, SparkStageProgress> lastProgressMap) {
+      Map<String, SparkStageProgress> lastProgressMap) {
 
     // do not print duplicate status while still in middle of print interval.
     boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap);
-    boolean isPassedInterval = System.currentTimeMillis() <= lastPrintTime + printInterval;
-    if (isDuplicateState && isPassedInterval) {
+    boolean withinInterval = System.currentTimeMillis() <= lastPrintTime + printInterval;
+    if (isDuplicateState && withinInterval) {
       return;
     }
 
+    String report = getReport(progressMap);
+    if (inPlaceUpdate) {
+      printStatusInPlace(progressMap);
+      console.logInfo(report);
+    } else {
+      console.printInfo(report);
+    }
+
+    lastPrintTime = System.currentTimeMillis();
+  }
+
+  private String getReport(Map<String, SparkStageProgress> progressMap) {
     StringBuilder reportBuffer = new StringBuilder();
     SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
     String currentDate = dt.format(new Date());
@@ -82,9 +194,9 @@ abstract class SparkJobMonitor {
           completed.add(s);
 
           if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
-            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE);
+            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
           }
-          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE);
+          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
         }
         if (complete < total && (complete > 0 || running > 0 || failed >
0)) {
           /* stage is started, but not complete */
@@ -93,24 +205,24 @@ abstract class SparkJobMonitor {
           }
           if (failed > 0) {
             reportBuffer.append(
-              String.format(
-                "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total));
+                String.format(
+                    "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total));
           } else {
             reportBuffer.append(
-              String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total));
+                String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total));
           }
         } else {
           /* stage is waiting for input/slots or complete */
           if (failed > 0) {
             /* tasks finished but some failed */
             reportBuffer.append(
-              String.format(
-                "%s: %d(-%d)/%d Finished with failed tasks\t",
-                stageName, complete, failed, total));
+                String.format(
+                    "%s: %d(-%d)/%d Finished with failed tasks\t",
+                    stageName, complete, failed, total));
           } else {
             if (complete == total) {
               reportBuffer.append(
-                String.format("%s: %d/%d Finished\t", stageName, complete, total));
+                  String.format("%s: %d/%d Finished\t", stageName, complete, total));
             } else {
               reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total));
             }
@@ -118,14 +230,12 @@ abstract class SparkJobMonitor {
         }
       }
     }
-
-    lastPrintTime = System.currentTimeMillis();
-    console.printInfo(reportBuffer.toString());
+    return reportBuffer.toString();
   }
 
   private boolean isSameAsPreviousProgress(
-    Map<String, SparkStageProgress> progressMap,
-    Map<String, SparkStageProgress> lastProgressMap) {
+      Map<String, SparkStageProgress> progressMap,
+      Map<String, SparkStageProgress> lastProgressMap) {
 
     if (lastProgressMap == null) {
       return false;
@@ -142,7 +252,7 @@ abstract class SparkJobMonitor {
         }
         for (String key : progressMap.keySet()) {
           if (!lastProgressMap.containsKey(key)
-            || !progressMap.get(key).equals(lastProgressMap.get(key))) {
+              || !progressMap.get(key).equals(lastProgressMap.get(key))) {
             return false;
           }
         }
@@ -150,4 +260,85 @@ abstract class SparkJobMonitor {
     }
     return true;
   }
+
+  private void repositionCursor() {
+    if (lines > 0) {
+      out.print(ansi().cursorUp(lines).toString());
+      out.flush();
+      lines = 0;
+    }
+  }
+
+  private void reprintLine(String line) {
+    InPlaceUpdates.reprintLine(out, line);
+    lines++;
+  }
+
+  private void reprintLineWithColorAsBold(String line, Ansi.Color color) {
+    out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
+        .toString());
+    out.flush();
+    lines++;
+  }
+
+  private String getNameWithProgress(String s, int complete, int total) {
+    String result = "";
+    if (s != null) {
+      float percent = total == 0 ? 1.0f : (float) complete / (float) total;
+      // lets use the remaining space in column 1 as progress bar
+      int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
+      String trimmedVName = s;
+
+      // if the vertex name is longer than column 1 width, trim it down
+      if (s.length() > COLUMN_1_WIDTH) {
+        trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2);
+        result = trimmedVName + "..";
+      } else {
+        result = trimmedVName + " ";
+      }
+
+      int toFill = (int) (spaceRemaining * percent);
+      for (int i = 0; i < toFill; i++) {
+        result += ".";
+      }
+    }
+    return result;
+  }
+
+  // STAGES: 03/04            [==================>>-----] 86%  ELAPSED TIME: 1.71 s
+  private String getFooter(int keySize, int completedSize, float progress, long startTime)
{
+    String verticesSummary = String.format("STAGES: %02d/%02d", completedSize, keySize);
+    String progressBar = getInPlaceProgressBar(progress);
+    final int progressPercent = (int) (progress * 100);
+    String progressStr = "" + progressPercent + "%";
+    float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
+    String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s";
+    String footer = String.format(FOOTER_FORMAT,
+        verticesSummary, progressBar, progressStr, elapsedTime);
+    return footer;
+  }
+
+  // [==================>>-----]
+  private String getInPlaceProgressBar(float percent) {
+    StringBuilder bar = new StringBuilder("[");
+    int remainingChars = progressBarChars - 4;
+    int completed = (int) (remainingChars * percent);
+    int pending = remainingChars - completed;
+    for (int i = 0; i < completed; i++) {
+      bar.append("=");
+    }
+    bar.append(">>");
+    for (int i = 0; i < pending; i++) {
+      bar.append("-");
+    }
+    bar.append("]");
+    return bar.toString();
+  }
+
+  private void reprintMultiLine(String line) {
+    int numLines = line.split("\r\n|\r|\n").length;
+    out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+    out.flush();
+    lines += numLines;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/345353c0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java
deleted file mode 100644
index 5b2d4e2..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.tez;
-
-import static org.fusesource.jansi.Ansi.ansi;
-import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.isatty;
-
-import java.io.PrintStream;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.fusesource.jansi.Ansi;
-
-import jline.TerminalFactory;
-
-public class InPlaceUpdates {
-
-  public static final int MIN_TERMINAL_WIDTH = 94;
-
-  static boolean isUnixTerminal() {
-
-    String os = System.getProperty("os.name");
-    if (os.startsWith("Windows")) {
-      // we do not support Windows, we will revisit this if we really need it for windows.
-      return false;
-    }
-
-    // We must be on some unix variant..
-    // check if standard out is a terminal
-    try {
-      // isatty system call will return 1 if the file descriptor is terminal else 0
-      if (isatty(STDOUT_FILENO) == 0) {
-        return false;
-      }
-      if (isatty(STDERR_FILENO) == 0) {
-        return false;
-      }
-    } catch (NoClassDefFoundError ignore) {
-      // These errors happen if the JNI lib is not available for your platform.
-      return false;
-    } catch (UnsatisfiedLinkError ignore) {
-      // These errors happen if the JNI lib is not available for your platform.
-      return false;
-    }
-    return true;
-  }
-
-  public static boolean inPlaceEligible(HiveConf conf) {
-    boolean inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
-
-    // we need at least 80 chars wide terminal to display in-place updates properly
-    return inPlaceUpdates && !SessionState.getConsole().getIsSilent() &&
isUnixTerminal()
-      && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH;
-  }
-
-  public static void reprintLine(PrintStream out, String line) {
-    out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
-    out.flush();
-  }
-
-  public static void rePositionCursor(PrintStream ps) {
-    ps.print(ansi().cursorUp(0).toString());
-    ps.flush();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/345353c0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 211a281..bd935d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;

http://git-wip-us.apache.org/repos/asf/hive/blob/345353c0/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index dab4c6a..61b8bd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -125,7 +125,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionTask;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates;
+import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.log.PerfLogger;


Mime
View raw message