hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mit...@apache.org
Subject hive git commit: HIVE-17576: Improve progress-reporting in TezProcessor (Mithun Radhakrishnan, reviewed by Owen O'Malley)
Date Mon, 09 Oct 2017 17:48:42 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2 b871be1ce -> 14d2ec2eb


HIVE-17576: Improve progress-reporting in TezProcessor (Mithun Radhakrishnan, reviewed by
Owen O'Malley)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/14d2ec2e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/14d2ec2e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/14d2ec2e

Branch: refs/heads/branch-2
Commit: 14d2ec2eb4267dd34536e286b26c003e0a69e466
Parents: b871be1
Author: Mithun Radhakrishnan <mithun@apache.org>
Authored: Mon Oct 9 10:39:24 2017 -0700
Committer: Mithun Radhakrishnan <mithun@apache.org>
Committed: Mon Oct 9 10:39:24 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/tez/TezProcessor.java   | 66 ++++++++++++++++++++
 1 file changed, 66 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/14d2ec2e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 486d43a..d59d484 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -66,7 +66,64 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
   private static final String CLASS_NAME = TezProcessor.class.getName();
   private final PerfLogger perfLogger = SessionState.getPerfLogger();
 
+  // TODO: Replace with direct call to ProgressHelper, when reliably available.
+  private static class ReflectiveProgressHelper {
+
+    Configuration conf;
+    Class<?> progressHelperClass = null;
+    Object progressHelper = null;
+
+    ReflectiveProgressHelper(Configuration conf,
+                             Map<String, LogicalInput> inputs,
+                             ProcessorContext processorContext,
+                             String processorName) {
+      this.conf = conf;
+      try {
+        progressHelperClass = this.conf.getClassByName("org.apache.tez.common.ProgressHelper");
+        progressHelper = progressHelperClass.getDeclaredConstructor(Map.class, ProcessorContext.class,
String.class)
+                            .newInstance(inputs, processorContext, processorName);
+        LOG.debug("ProgressHelper initialized!");
+      }
+      catch(Exception ex) {
+        LOG.warn("Could not find ProgressHelper. " + ex);
+      }
+    }
+
+    private boolean isValid() {
+      return progressHelperClass != null && progressHelper != null;
+    }
+
+    void scheduleProgressTaskService(long delay, long period) {
+      if (!isValid()) {
+        LOG.warn("ProgressHelper uninitialized. Bailing on scheduleProgressTaskService()");
+        return;
+      }
+      try {
+        progressHelperClass.getDeclaredMethod("scheduleProgressTaskService", long.class,
long.class)
+            .invoke(progressHelper, delay, period);
+        LOG.debug("scheduleProgressTaskService() called!");
+      } catch (Exception exception) {
+        LOG.warn("Could not scheduleProgressTaskService.", exception);
+      }
+    }
+
+    void shutDownProgressTaskService() {
+      if (!isValid()) {
+        LOG.warn("ProgressHelper uninitialized. Bailing on scheduleProgressTaskService()");
+        return;
+      }
+      try {
+        progressHelperClass.getDeclaredMethod("shutDownProgressTaskService").invoke(progressHelper);
+        LOG.debug("shutDownProgressTaskService() called!");
+      }
+      catch (Exception exception) {
+        LOG.warn("Could not shutDownProgressTaskService.", exception);
+      }
+    }
+  }
+
   protected ProcessorContext processorContext;
+  private ReflectiveProgressHelper progressHelper;
 
   protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
   protected static final NumberFormat jobIdFormat = NumberFormat.getInstance();
@@ -87,6 +144,9 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
     // we have to close in the processor's run method, because tez closes inputs
     // before calling close (TEZ-955) and we might need to read inputs
     // when we flush the pipeline.
+      if (progressHelper != null) {
+        progressHelper.shutDownProgressTaskService();
+      }
   }
 
   @Override
@@ -154,6 +214,11 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
       if (aborted.get()) {
         return;
       }
+
+      // leverage TEZ-3437: Improve synchronization and the progress report behavior.
+      progressHelper = new ReflectiveProgressHelper(jobConf, inputs, getContext(), this.getClass().getSimpleName());
+
+
       // There should be no blocking operation in RecordProcessor creation,
       // otherwise the abort operation will not register since they are synchronized on the
same
       // lock.
@@ -164,6 +229,7 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
       }
     }
 
+    progressHelper.scheduleProgressTaskService(0, 100);
     if (!aborted.get()) {
       initializeAndRunProcessor(inputs, outputs);
     }


Mime
View raw message