hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1436221 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: lib/ optimizer/
Date Mon, 21 Jan 2013 08:20:20 GMT
Author: hashutosh
Date: Mon Jan 21 08:20:20 2013
New Revision: 1436221

URL: http://svn.apache.org/viewvc?rev=1436221&view=rev
Log:
NPE in union processing followed by lateral view followed by 2 group bys (Navis via Ashutosh
Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java?rev=1436221&r1=1436220&r2=1436221&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java Mon Jan 21 08:20:20 2013
@@ -50,4 +50,18 @@ public class Utils {
     
     return ret_nd;
   }
+
+  /**
+   * Find the first node of a type from ancestor stack, starting from parents.
+   * Returns null if not found.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T findNode(Stack<Node> stack, Class<T> target) {
+    for (int i = stack.size() - 2; i >= 0; i--) {
+      if (target.isInstance(stack.get(i))) {
+        return (T) stack.get(i);
+      }
+    }
+    return null;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1436221&r1=1436220&r2=1436221&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Mon Jan
21 08:20:20 2013
@@ -862,7 +862,7 @@ public class GenMRFileSink1 implements N
           if (currUnionOp != null) {
             opTaskMap.put(null, currTask);
             ctx.setCurrTopOp(null);
-            GenMapRedUtils.initUnionPlan(ctx, currTask, false);
+            GenMapRedUtils.initUnionPlan(ctx, currUnionOp, currTask, false);
             return dest;
           }
         }
@@ -880,7 +880,7 @@ public class GenMRFileSink1 implements N
 
     if (currUnionOp != null) {
       opTaskMap.put(null, currTask);
-      GenMapRedUtils.initUnionPlan(ctx, currTask, false);
+      GenMapRedUtils.initUnionPlan(ctx, currUnionOp, currTask, false);
       return dest;
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java?rev=1436221&r1=1436220&r2=1436221&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java Mon Jan
21 08:20:20 2013
@@ -81,7 +81,7 @@ public class GenMRRedSink1 implements No
     } else {
       // This will happen in case of joins. The current plan can be thrown away
       // after being merged with the original plan
-      GenMapRedUtils.joinPlan(op, null, opMapTask, ctx, -1, false, false, false);
+      GenMapRedUtils.joinPlan(op, null, opMapTask, ctx, -1, false, false, null);
       currTask = opMapTask;
       ctx.setCurrTask(currTask);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java?rev=1436221&r1=1436220&r2=1436221&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java Mon Jan
21 08:20:20 2013
@@ -71,8 +71,7 @@ public class GenMRRedSink2 implements No
     if (opMapTask == null) {
       GenMapRedUtils.splitPlan(op, ctx);
     } else {
-      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false,
-          false);
+      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false, null);
       currTask = opMapTask;
       ctx.setCurrTask(currTask);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java?rev=1436221&r1=1436220&r2=1436221&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java Mon Jan
21 08:20:20 2013
@@ -26,9 +26,11 @@ import java.util.Stack;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Utils;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -58,9 +60,12 @@ public class GenMRRedSink3 implements No
     // union consisted on a bunch of map-reduce jobs, and it has been split at
     // the union
     Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
+    UnionOperator union = Utils.findNode(stack, UnionOperator.class);
+    assert union != null;
+
     Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
         .getMapCurrCtx();
-    GenMapRedCtx mapredCtx = mapCurrCtx.get(ctx.getCurrUnionOp());
+    GenMapRedCtx mapredCtx = mapCurrCtx.get(union);
 
     Task<? extends Serializable> unionTask = null;
     if(mapredCtx != null) {
@@ -81,7 +86,7 @@ public class GenMRRedSink3 implements No
     if (reducerTask == null) {
       // When the reducer is encountered for the first time
       if (plan.getReducer() == null) {
-        GenMapRedUtils.initUnionPlan(op, ctx, unionTask);
+        GenMapRedUtils.initUnionPlan(op, union, ctx, unionTask);
         // When union is followed by a multi-table insert
       } else {
         GenMapRedUtils.splitPlan(op, ctx);
@@ -90,9 +95,9 @@ public class GenMRRedSink3 implements No
       // The union is already initialized. However, the union is walked from
       // another input
       // initUnionPlan is idempotent
-      GenMapRedUtils.initUnionPlan(op, ctx, unionTask);
+      GenMapRedUtils.initUnionPlan(op, union, ctx, unionTask);
     } else {
-      GenMapRedUtils.joinUnionPlan(ctx, unionTask, reducerTask, false);
+      GenMapRedUtils.joinUnionPlan(ctx, union, unionTask, reducerTask, false);
       ctx.setCurrTask(reducerTask);
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java?rev=1436221&r1=1436220&r2=1436221&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java Mon Jan
21 08:20:20 2013
@@ -75,7 +75,7 @@ public class GenMRRedSink4 implements No
     if (opMapTask == null) {
       // When the reducer is encountered for the first time
       if (plan.getReducer() == null) {
-        GenMapRedUtils.initMapJoinPlan(op, ctx, true, false, true, -1);
+        GenMapRedUtils.initMapJoinPlan(op, ctx, true, null, true, -1);
         // When mapjoin is followed by a multi-table insert
       } else {
         GenMapRedUtils.splitPlan(op, ctx);
@@ -85,8 +85,7 @@ public class GenMRRedSink4 implements No
       // been initialized.
       // Initialize the current branch, and join with the original plan.
       assert plan.getReducer() != reducer;
-      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, false, true,
-          false);
+      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, false, true, null);
     }
 
     mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1436221&r1=1436220&r2=1436221&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Jan
21 08:20:20 2013
@@ -139,9 +139,9 @@ public final class GenMapRedUtils {
 
   public static void initMapJoinPlan(
     Operator<? extends OperatorDesc> op, GenMRProcContext ctx,
-    boolean readInputMapJoin, boolean readInputUnion, boolean setReducer, int pos)
+    boolean readInputMapJoin, UnionOperator currUnionOp, boolean setReducer, int pos)
     throws SemanticException {
-    initMapJoinPlan(op, ctx, readInputMapJoin, readInputUnion, setReducer, pos, false);
+    initMapJoinPlan(op, ctx, readInputMapJoin, currUnionOp, setReducer, pos, false);
   }
 
   /**
@@ -156,7 +156,7 @@ public final class GenMapRedUtils {
    */
   public static void initMapJoinPlan(Operator<? extends OperatorDesc> op,
       GenMRProcContext opProcCtx, boolean readInputMapJoin,
-      boolean readInputUnion, boolean setReducer, int pos, boolean createLocalPlan)
+      UnionOperator currUnionOp, boolean setReducer, int pos, boolean createLocalPlan)
       throws SemanticException {
     Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
       opProcCtx.getMapCurrCtx();
@@ -191,7 +191,7 @@ public final class GenMapRedUtils {
         opTaskMap.put(op, currTask);
       }
 
-      if (!readInputUnion) {
+      if (currUnionOp == null) {
         GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(currMapJoinOp);
         String taskTmpDir;
         TableDesc tt_desc;
@@ -212,7 +212,7 @@ public final class GenMapRedUtils {
         setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
         setupBucketMapJoinInfo(plan, currMapJoinOp, createLocalPlan);
       } else {
-        initUnionPlan(opProcCtx, currTask, false);
+        initUnionPlan(opProcCtx, currUnionOp, currTask, false);
       }
 
       opProcCtx.setCurrMapJoinOp(null);
@@ -305,7 +305,7 @@ public final class GenMapRedUtils {
    * @param opProcCtx
    *          processing context
    */
-  public static void initUnionPlan(ReduceSinkOperator op,
+  public static void initUnionPlan(ReduceSinkOperator op, UnionOperator currUnionOp,
       GenMRProcContext opProcCtx,
       Task<? extends Serializable> unionTask) throws SemanticException {
     Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
@@ -324,7 +324,7 @@ public final class GenMapRedUtils {
       plan.setNeedsTagging(true);
     }
 
-    initUnionPlan(opProcCtx, unionTask, false);
+    initUnionPlan(opProcCtx, currUnionOp, unionTask, false);
   }
 
   private static void setUnionPlan(GenMRProcContext opProcCtx,
@@ -373,11 +373,10 @@ public final class GenMapRedUtils {
    * It is a idempotent function to add various intermediate files as the source
    * for the union. The plan has already been created.
    */
-  public static void initUnionPlan(GenMRProcContext opProcCtx,
+  public static void initUnionPlan(GenMRProcContext opProcCtx, UnionOperator currUnionOp,
       Task<? extends Serializable> currTask, boolean local)
       throws SemanticException {
     MapredWork plan = (MapredWork) currTask.getWork();
-    UnionOperator currUnionOp = opProcCtx.getCurrUnionOp();
     // In case of lateral views followed by a join, the same tree
     // can be traversed more than one
     if (currUnionOp != null) {
@@ -391,11 +390,11 @@ public final class GenMapRedUtils {
    * join current union task to old task
    */
   public static void joinUnionPlan(GenMRProcContext opProcCtx,
+      UnionOperator currUnionOp,
       Task<? extends Serializable> currentUnionTask,
       Task<? extends Serializable> existingTask, boolean local)
       throws SemanticException {
     MapredWork plan = (MapredWork) existingTask.getWork();
-    UnionOperator currUnionOp = opProcCtx.getCurrUnionOp();
     assert currUnionOp != null;
     GenMRUnionCtx uCtx = opProcCtx.getUnionTask(currUnionOp);
     assert uCtx != null;
@@ -437,8 +436,8 @@ public final class GenMapRedUtils {
   public static void joinPlan(Operator<? extends OperatorDesc> op,
       Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
       GenMRProcContext opProcCtx, int pos, boolean split,
-      boolean readMapJoinData, boolean readUnionData) throws SemanticException {
-    joinPlan(op, oldTask, task, opProcCtx, pos, split, readMapJoinData, readUnionData, false);
+      boolean readMapJoinData, UnionOperator currUnionOp) throws SemanticException {
+    joinPlan(op, oldTask, task, opProcCtx, pos, split, readMapJoinData, currUnionOp, false);
   }
 
   /**
@@ -458,7 +457,7 @@ public final class GenMapRedUtils {
   public static void joinPlan(Operator<? extends OperatorDesc> op,
       Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
       GenMRProcContext opProcCtx, int pos, boolean split,
-      boolean readMapJoinData, boolean readUnionData, boolean createLocalWork)
+      boolean readMapJoinData, UnionOperator currUnionOp, boolean createLocalWork)
       throws SemanticException {
     Task<? extends Serializable> currTask = task;
     MapredWork plan = (MapredWork) currTask.getWork();
@@ -502,8 +501,8 @@ public final class GenMapRedUtils {
       opProcCtx.setCurrTopOp(currTopOp);
     } else if (opProcCtx.getCurrMapJoinOp() != null) {
       AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = opProcCtx.getCurrMapJoinOp();
-      if (readUnionData) {
-        initUnionPlan(opProcCtx, currTask, false);
+      if (currUnionOp != null) {
+        initUnionPlan(opProcCtx, currUnionOp, currTask, false);
       } else {
         GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1436221&r1=1436220&r2=1436221&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Mon Jan
21 08:20:20 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Un
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Utils;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
@@ -100,12 +101,12 @@ public final class MapJoinFactory {
       // If the plan for this reducer does not exist, initialize the plan
       if (opMapTask == null) {
         assert currPlan.getReducer() == null;
-        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, false, false, false, pos);
+        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, false, null, false, pos);
       } else {
         // The current plan can be thrown away after being merged with the
         // original plan
         GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false,
-            false, false);
+            false, null);
         currTask = opMapTask;
         ctx.setCurrTask(currTask);
       }
@@ -155,7 +156,7 @@ public final class MapJoinFactory {
         // The current plan can be thrown away after being merged with the
         // original plan
         GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, opProcCtx, pos,
-            false, false, false);
+            false, false, null);
         currTask = opMapTask;
         opProcCtx.setCurrTask(currTask);
       }
@@ -302,12 +303,12 @@ public final class MapJoinFactory {
       // If the plan for this reducer does not exist, initialize the plan
       if (opMapTask == null) {
         assert currPlan.getReducer() == null;
-        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, false, false, pos);
+        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, null, false, pos);
       } else {
         // The current plan can be thrown away after being merged with the
         // original plan
         GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, ctx, pos, false,
-            true, false);
+            true, null);
         currTask = opMapTask;
         ctx.setCurrTask(currTask);
       }
@@ -336,7 +337,7 @@ public final class MapJoinFactory {
             .process(nd, stack, procCtx, nodeOutputs);
       }
 
-      UnionOperator currUnion = ctx.getCurrUnionOp();
+      UnionOperator currUnion = Utils.findNode(stack, UnionOperator.class);
       assert currUnion != null;
       ctx.getUnionTask(currUnion);
       AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>)
nd;
@@ -356,8 +357,7 @@ public final class MapJoinFactory {
       Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
 
       // union result cannot be a map table
-      boolean local = (pos == (mapJoin.getConf()).getPosBigTable()) ? false
-          : true;
+      boolean local = (pos != mapJoin.getConf().getPosBigTable());
       if (local) {
         throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_TABLE.getMsg());
       }
@@ -366,19 +366,18 @@ public final class MapJoinFactory {
       if (opMapTask == null) {
         assert currPlan.getReducer() == null;
         ctx.setCurrMapJoinOp(mapJoin);
-        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, true, false, pos);
+        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, currUnion, false, pos);
         ctx.setCurrUnionOp(null);
       } else {
         // The current plan can be thrown away after being merged with the
         // original plan
-        Task<? extends Serializable> uTask = ctx.getUnionTask(
-            ctx.getCurrUnionOp()).getUTask();
+        Task<? extends Serializable> uTask = ctx.getUnionTask(currUnion).getUTask();
         if (uTask.getId().equals(opMapTask.getId())) {
           GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false,
-              false, true);
+              false, currUnion);
         } else {
           GenMapRedUtils.joinPlan(mapJoin, uTask, opMapTask, ctx, pos, false,
-              false, true);
+              false, currUnion);
         }
         currTask = opMapTask;
         ctx.setCurrTask(currTask);



Mime
View raw message