hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1580984 - /hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
Date Mon, 24 Mar 2014 18:42:30 GMT
Author: gunther
Date: Mon Mar 24 18:42:30 2014
New Revision: 1580984

URL: http://svn.apache.org/r1580984
Log:
HIVE-6700: In some queries inputs are closed on Tez before the operator pipeline is flushed
(Gunther Hagleitner, reviewed by Vikram Dixit K and Siddharth Seth)

Modified:
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1580984&r1=1580983&r2=1580984&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
(original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
Mon Mar 24 18:42:30 2014
@@ -74,9 +74,9 @@ public class TezProcessor implements Log
 
   @Override
   public void close() throws IOException {
-    if(rproc != null){
-      rproc.close();
-    }
+    // we have to close in the processor's run method, because tez closes inputs 
+    // before calling close (TEZ-955) and we might need to read inputs
+    // when we flush the pipeline.
   }
 
   @Override
@@ -123,42 +123,48 @@ public class TezProcessor implements Log
   @Override
   public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput>
outputs)
       throws Exception {
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
-    // in case of broadcast-join read the broadcast edge inputs
-    // (possibly asynchronously)
-
-    LOG.info("Running task: " + processorContext.getUniqueIdentifier());
-
-    if (isMap) {
-      rproc = new MapRecordProcessor();
-      MRInputLegacy mrInput = getMRInput(inputs);
-      try {
-        mrInput.init();
-      } catch (IOException e) {
-        throw new RuntimeException("Failed while initializing MRInput", e);
+    try{
+      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
+      // in case of broadcast-join read the broadcast edge inputs
+      // (possibly asynchronously)
+      
+      LOG.info("Running task: " + processorContext.getUniqueIdentifier());
+      
+      if (isMap) {
+        rproc = new MapRecordProcessor();
+        MRInputLegacy mrInput = getMRInput(inputs);
+        try {
+          mrInput.init();
+        } catch (IOException e) {
+          throw new RuntimeException("Failed while initializing MRInput", e);
+        }
+      } else {
+        rproc = new ReduceRecordProcessor();
       }
-    } else {
-      rproc = new ReduceRecordProcessor();
-    }
 
-    TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
-    // Start the actual Inputs. After MRInput initialization.
-    for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
-      if (!cacheAccess.isInputCached(inputEntry.getKey())) {
-        inputEntry.getValue().start();
-      } else {
-        LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+      TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
+      // Start the actual Inputs. After MRInput initialization.
+      for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
+        if (!cacheAccess.isInputCached(inputEntry.getKey())) {
+          inputEntry.getValue().start();
+        } else {
+          LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+        }
+      }
+      
+      // Outputs will be started later by the individual Processors.
+      
+      MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
+      rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
+      rproc.run();
+
+      //done - output does not need to be committed as hive does not use outputcommitter
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
+    } finally {
+      if(rproc != null){
+        rproc.close();
       }
     }
-
-    // Outputs will be started later by the individual Processors.
-
-    MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
-    rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
-    rproc.run();
-
-    //done - output does not need to be committed as hive does not use outputcommitter
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
   }
 
   /**



Mime
View raw message