hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sze...@apache.org
Subject svn commit: r1640654 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: HashTableLoader.java SparkReduceRecordHandler.java
Date Thu, 20 Nov 2014 02:55:46 GMT
Author: szehon
Date: Thu Nov 20 02:55:46 2014
New Revision: 1640654

URL: http://svn.apache.org/r1640654
Log:
HIVE-8883 : Investigate test failures on auto_join30.q [Spark Branch] (Chao Sun via Szehon)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1640654&r1=1640653&r2=1640654&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
Thu Nov 20 02:55:46 2014
@@ -67,7 +67,13 @@ public class HashTableLoader implements 
       MapJoinTableContainer[] mapJoinTables,
       MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException
{
 
-    String currentInputPath = context.getCurrentInputPath().toString();
+    // Note: it's possible that a MJ operator is in a ReduceWork, in which case the
+    // currentInputPath will be null. But, since currentInputPath is only interesting
+    // for bucket join case, and for bucket join the MJ operator will always be in
+    // a MapWork, this should be OK.
+    String currentInputPath =
+        context.getCurrentInputPath() == null ? null : context.getCurrentInputPath().toString();
+
     LOG.info("******* Load from HashTable for input file: " + currentInputPath);
     MapredLocalWork localWork = context.getLocalWork();
     try {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1640654&r1=1640653&r2=1640654&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
Thu Nov 20 02:55:46 2014
@@ -34,11 +34,13 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -101,6 +103,7 @@ public class SparkReduceRecordHandler ex
   private StructObjectInspector[] valueStructInspectors;
   /* this is only used in the error code path */
   private List<VectorExpressionWriter>[] valueStringWriters;
+  private MapredLocalWork localWork = null;
 
   public void init(JobConf job, OutputCollector output, Reporter reporter) {
     super.init(job, output, reporter);
@@ -197,8 +200,9 @@ public class SparkReduceRecordHandler ex
     }
 
     ExecMapperContext execContext = new ExecMapperContext(job);
+    localWork = gWork.getMapRedLocalWork();
     execContext.setJc(jc);
-    execContext.setLocalWork(gWork.getMapRedLocalWork());
+    execContext.setLocalWork(localWork);
     reducer.setExecContext(execContext);
 
     reducer.setReporter(rp);
@@ -209,6 +213,14 @@ public class SparkReduceRecordHandler ex
     try {
       LOG.info(reducer.dump(0));
       reducer.initialize(jc, rowObjectInspector);
+
+      if (localWork != null) {
+        for (Operator<? extends OperatorDesc> dummyOp : localWork.getDummyParentOp())
{
+          dummyOp.setExecContext(execContext);
+          dummyOp.initialize(jc, null);
+        }
+      }
+
     } catch (Throwable e) {
       abort = true;
       if (e instanceof OutOfMemoryError) {
@@ -218,6 +230,7 @@ public class SparkReduceRecordHandler ex
         throw new RuntimeException("Reduce operator initialization failed", e);
       }
     }
+
   }
 
   @Override
@@ -416,6 +429,13 @@ public class SparkReduceRecordHandler ex
       }
 
       reducer.close(abort);
+
+      if (localWork != null) {
+        for (Operator<? extends OperatorDesc> dummyOp : localWork.getDummyParentOp())
{
+          dummyOp.close(abort);
+        }
+      }
+
       ReportStats rps = new ReportStats(rp, jc);
       reducer.preorderMap(rps);
 



Mime
View raw message