hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1549374 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql: lib/DefaultGraphWalker.java parse/GenTezProcContext.java parse/GenTezWork.java parse/GenTezWorkWalker.java parse/TezCompiler.java
Date Mon, 09 Dec 2013 03:00:13 GMT
Author: gunther
Date: Mon Dec  9 03:00:13 2013
New Revision: 1549374

URL: http://svn.apache.org/r1549374
Log:
HIVE-5984: Multi insert statement fails on Tez (Gunther Hagleitner)

Added:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java?rev=1549374&r1=1549373&r2=1549374&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java Mon
Dec  9 03:00:13 2013
@@ -36,9 +36,9 @@ import org.apache.hadoop.hive.ql.parse.S
 public class DefaultGraphWalker implements GraphWalker {
 
   protected Stack<Node> opStack;
-  private final List<Node> toWalk = new ArrayList<Node>();
-  private final HashMap<Node, Object> retMap = new HashMap<Node, Object>();
-  private final Dispatcher dispatcher;
+  protected final List<Node> toWalk = new ArrayList<Node>();
+  protected final HashMap<Node, Object> retMap = new HashMap<Node, Object>();
+  protected final Dispatcher dispatcher;
 
   /**
    * Constructor.

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1549374&r1=1549373&r2=1549374&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Mon
Dec  9 03:00:13 2013
@@ -19,18 +19,16 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.io.Serializable;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Stack;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -61,10 +59,6 @@ public class GenTezProcContext implement
   public final Set<ReadEntity> inputs;
   public final Set<WriteEntity> outputs;
 
-  // rootOperators are all the table scan operators in sequence
-  // of traversal
-  public final Deque<Operator<? extends OperatorDesc>> rootOperators;
-
   // holds the root of the operator tree we're currently processing
   // this could be a table scan, but also a join, ptf, etc (i.e.:
   // first operator of a reduce task.
@@ -98,6 +92,9 @@ public class GenTezProcContext implement
   // a map that maintains operator (file-sink or reduce-sink) to work mapping
   public final Map<Operator<?>, BaseWork> operatorWorkMap;
 
+  // a map to keep track of which root generated which work
+  public final Map<Operator<?>, BaseWork> rootToWorkMap;
+
   // we need to keep the original list of operators in the map join to know
   // what position in the mapjoin the different parent work items will have.
   public final Map<MapJoinOperator, List<Operator<?>>> mapJoinParentMap;
@@ -108,19 +105,10 @@ public class GenTezProcContext implement
   // used to group dependent tasks for multi table inserts
   public final DependencyCollectionTask dependencyTask;
 
-  // root of last multi child operator encountered
-  public Stack<Operator<?>> lastRootOfMultiChildOperator;
-
-  // branches of current multi-child operator
-  public Stack<Integer> currentBranchCount;
-
-  // work generated for last multi-child operator
-  public Stack<BaseWork> lastWorkForMultiChildOperator;
-
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
       List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>>
rootTasks,
-      Set<ReadEntity> inputs, Set<WriteEntity> outputs, Deque<Operator<?>>
rootOperators) {
+      Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
 
     this.conf = conf;
     this.parseContext = parseContext;
@@ -130,16 +118,15 @@ public class GenTezProcContext implement
     this.outputs = outputs;
     this.currentTask = (TezTask) TaskFactory.get(new TezWork(), conf);
     this.leafOperatorToFollowingWork = new HashMap<Operator<?>, BaseWork>();
-    this.rootOperators = rootOperators;
     this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>();
     this.linkWorkWithReduceSinkMap = new HashMap<BaseWork, List<ReduceSinkOperator>>();
     this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>();
+    this.rootToWorkMap = new HashMap<Operator<?>, BaseWork>();
     this.mapJoinParentMap = new HashMap<MapJoinOperator, List<Operator<?>>>();
     this.linkChildOpWithDummyOp = new HashMap<Operator<?>, List<Operator<?>>>();
     this.dependencyTask = (DependencyCollectionTask)
         TaskFactory.get(new DependencyCollectionWork(), conf);
-    this.lastRootOfMultiChildOperator = new Stack<Operator<?>>();
-    this.currentBranchCount = new Stack<Integer>();
-    this.lastWorkForMultiChildOperator = new Stack<BaseWork>();
+
+    rootTasks.add(currentTask);
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1549374&r1=1549373&r2=1549374&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Mon Dec
 9 03:00:13 2013
@@ -48,9 +48,9 @@ public class GenTezWork implements NodeP
 
   static final private Log LOG = LogFactory.getLog(GenTezWork.class.getName());
 
+  // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...)
   private int sequenceNumber = 0;
 
-  @SuppressWarnings("unchecked")
   @Override
   public Object process(Node nd, Stack<Node> stack,
       NodeProcessorCtx procContext, Object... nodeOutputs)
@@ -62,130 +62,39 @@ public class GenTezWork implements NodeP
     // a new vertex.
     Operator<?> operator = (Operator<?>) nd;
 
-    TezWork tezWork = context.currentTask.getWork();
-    if (!context.rootTasks.contains(context.currentTask)) {
-      context.rootTasks.add(context.currentTask);
-    }
-
     // root is the start of the operator pipeline we're currently
     // packing into a vertex, typically a table scan, union or join
     Operator<?> root = context.currentRootOperator;
-    if (root == null) {
-      // null means that we're starting with a new table scan
-      // the graph walker walks the rootOperators in the same
-      // order so we can just take the next
-      context.preceedingWork = null;
-
-      // if there are branches remaining we can't pop the next
-      // root operator yet.
-      if (context.currentBranchCount.isEmpty()
-          || (!context.lastWorkForMultiChildOperator.isEmpty()
-              && context.lastWorkForMultiChildOperator.peek() == null)) {
-        root = context.rootOperators.pop();
-      }
-    }
 
     LOG.debug("Root operator: " + root);
     LOG.debug("Leaf operator: " + operator);
 
+    TezWork tezWork = context.currentTask.getWork();
+
     // Right now the work graph is pretty simple. If there is no
     // Preceding work we have a root and will generate a map
     // vertex. If there is a preceding work we will generate
     // a reduce vertex
     BaseWork work;
-    if (context.preceedingWork == null) {
-      if (root == null) {
-        // this is the multi-insert case. we need to reuse the last
-        // table scan work.
-        root = context.lastRootOfMultiChildOperator.peek();
-        work = context.lastWorkForMultiChildOperator.peek();
-        LOG.debug("Visiting additional branch in: "+root);
-
-      } else {
-        assert root.getParentOperators().isEmpty();
-        MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
-        LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
-
-        // map work starts with table scan operators
-        assert root instanceof TableScanOperator;
-        String alias = ((TableScanOperator)root).getConf().getAlias();
-
-        GenMapRedUtils.setMapWork(mapWork, context.parseContext,
-            context.inputs, null, root, alias, context.conf, false);
-        tezWork.add(mapWork);
-        work = mapWork;
-
-        // remember this table scan and work item. this is needed for multiple
-        // insert statements where multiple operator pipelines hang of a single
-        // table scan
-        if (!context.lastWorkForMultiChildOperator.isEmpty()
-            && context.lastWorkForMultiChildOperator.peek() == null) {
-          LOG.debug("Capturing current work for 'multiple branches' case");
-          context.lastWorkForMultiChildOperator.pop();
-          context.lastWorkForMultiChildOperator.push(work);
-        }
-      }
-
-      if (!context.currentBranchCount.isEmpty()) {
-        // we've handled one branch. Adjust the counts.
-        int branches = context.currentBranchCount.pop();
-        if (--branches != 0) {
-          LOG.debug("Remaining branches: "+branches);
-          context.currentBranchCount.push(branches);
-        } else {
-          LOG.debug("No more remaining branches.");
-          context.lastRootOfMultiChildOperator.pop();
-          context.lastWorkForMultiChildOperator.pop();
-        }
-      }
-
+    if (context.rootToWorkMap.containsKey(root)) {
+      // having seen the root operator before means there was a branch in the
+      // operator graph. There's typically two reasons for that: a) mux/demux
+      // b) multi insert. Mux/Demux will hit the same leaf again, multi insert
+      // will result into a vertex with multiple FS or RS operators.
+
+      // At this point we don't have to do anything special in this case. Just
+      // run through the regular paces w/o creating a new task.
+      work = context.rootToWorkMap.get(root);
     } else {
-      assert !root.getParentOperators().isEmpty();
-      ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
-      LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
-      reduceWork.setReducer(root);
-      reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
-
-      // All parents should be reduce sinks. We pick the one we just walked
-      // to choose the number of reducers. In the join/union case they will
-      // all be -1. In sort/order case where it matters there will be only
-      // one parent.
-      assert context.parentOfRoot instanceof ReduceSinkOperator;
-      ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
-
-      LOG.debug("Setting up reduce sink: " + reduceSink);
-
-      reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
-
-      // need to fill in information about the key and value in the reducer
-      GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
-
-      // remember which parent belongs to which tag
-      reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
-           context.preceedingWork.getName());
-
-      // remember the output name of the reduce sink
-      reduceSink.getConf().setOutputName(reduceWork.getName());
-
-      tezWork.add(reduceWork);
-      tezWork.connect(
-          context.preceedingWork,
-          reduceWork, EdgeType.SIMPLE_EDGE);
-
-      work = reduceWork;
-
-      // remember this work item. this is needed for multiple
-      // insert statements where multiple operator pipelines hang of a forward
-      // operator
-      if (!context.lastWorkForMultiChildOperator.isEmpty()
-          && context.lastWorkForMultiChildOperator.peek() == null) {
-        LOG.debug("Capturing current work for 'multiple branches' case");
-        context.lastWorkForMultiChildOperator.pop();
-        context.lastWorkForMultiChildOperator.push(work);
+      // create a new vertex
+      if (context.preceedingWork == null) {
+        work = createMapWork(context, root, tezWork);
+      } else {
+        work = createReduceWork(context, root, tezWork);
       }
+      context.rootToWorkMap.put(root, work);
     }
 
-    // We're scanning the operator from table scan to final file sink.
     // We're scanning a tree from roots to leaf (this is not technically
     // correct, demux and mux operators might form a diamond shape, but
     // we will only scan one path and ignore the others, because the
@@ -233,20 +142,15 @@ public class GenTezWork implements NodeP
       context.parentOfRoot = operator;
       context.currentRootOperator = operator.getChildOperators().get(0);
       context.preceedingWork = work;
-    } else {
-      LOG.debug("Leaf operator - resetting context: " + context.currentRootOperator);
-      context.parentOfRoot = null;
-      context.currentRootOperator = null;
-      context.preceedingWork = null;
     }
 
     /*
      * this happens in case of map join operations.
      * The tree looks like this:
      *
-     *        RS <--- we are here perhaps
-     *        |
-     *      MapJoin
+     *       RS <--- we are here perhaps
+     *       |
+     *    MapJoin
      *    /     \
      *  RS       TS
      *  /
@@ -266,10 +170,10 @@ public class GenTezWork implements NodeP
       }
       for (BaseWork parentWork : linkWorkList) {
         tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
-        
+
         // need to set up output name for reduce sink not that we know the name
         // of the downstream work
-        for (ReduceSinkOperator r: 
+        for (ReduceSinkOperator r:
                context.linkWorkWithReduceSinkMap.get(parentWork)) {
           r.getConf().setOutputName(work.getName());
         }
@@ -279,4 +183,65 @@ public class GenTezWork implements NodeP
     return null;
   }
 
+  private ReduceWork createReduceWork(GenTezProcContext context, Operator<?> root,
+      TezWork tezWork) {
+    assert !root.getParentOperators().isEmpty();
+    ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
+    LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
+    reduceWork.setReducer(root);
+    reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
+
+    // All parents should be reduce sinks. We pick the one we just walked
+    // to choose the number of reducers. In the join/union case they will
+    // all be -1. In sort/order case where it matters there will be only
+    // one parent.
+    assert context.parentOfRoot instanceof ReduceSinkOperator;
+    ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
+
+    reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
+
+    setupReduceSink(context, reduceWork, reduceSink);
+
+    tezWork.add(reduceWork);
+    tezWork.connect(
+        context.preceedingWork,
+        reduceWork, EdgeType.SIMPLE_EDGE);
+
+    return reduceWork;
+  }
+
+  private void setupReduceSink(GenTezProcContext context, ReduceWork reduceWork,
+      ReduceSinkOperator reduceSink) {
+
+    LOG.debug("Setting up reduce sink: " + reduceSink
+        + " with following reduce work: " + reduceWork.getName());
+
+    // need to fill in information about the key and value in the reducer
+    GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
+
+    // remember which parent belongs to which tag
+    reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
+         context.preceedingWork.getName());
+
+    // remember the output name of the reduce sink
+    reduceSink.getConf().setOutputName(reduceWork.getName());
+  }
+
+  private MapWork createMapWork(GenTezProcContext context, Operator<?> root,
+      TezWork tezWork) throws SemanticException {
+    assert root.getParentOperators().isEmpty();
+    MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
+    LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
+
+    // map work starts with table scan operators
+    assert root instanceof TableScanOperator;
+    String alias = ((TableScanOperator)root).getConf().getAlias();
+
+    GenMapRedUtils.setMapWork(mapWork, context.parseContext,
+        context.inputs, null, root, alias, context.conf, false);
+    tezWork.add(mapWork);
+
+    return mapWork;
+  }
+
 }

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java?rev=1549374&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java Mon
Dec  9 03:00:13 2013
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/**
+ * Walks the operator tree in DFS fashion.
+ */
+public class GenTezWorkWalker extends DefaultGraphWalker {
+
+  private final GenTezProcContext ctx;
+
+  /**
+   * constructor of the walker - the dispatcher is passed.
+   *
+   * @param disp the dispatcher to be called for each node visited
+   * @param ctx the context where we'll set the current root operator
+   *
+   */
+  public GenTezWorkWalker(Dispatcher disp, GenTezProcContext ctx) {
+    super(disp);
+    this.ctx = ctx;
+  }
+
+  private void setRoot(Node nd) {
+    ctx.currentRootOperator = (Operator<? extends OperatorDesc>) nd;
+    ctx.preceedingWork = null;
+    ctx.parentOfRoot = null;
+  }
+
+  /**
+   * starting point for walking.
+   *
+   * @throws SemanticException
+   */
+  @Override
+  public void startWalking(Collection<Node> startNodes,
+      HashMap<Node, Object> nodeOutput) throws SemanticException {
+    toWalk.addAll(startNodes);
+    while (toWalk.size() > 0) {
+      Node nd = toWalk.remove(0);
+      setRoot(nd);
+      walk(nd);
+      if (nodeOutput != null) {
+        nodeOutput.put(nd, retMap.get(nd));
+      }
+    }
+  }
+
+  /**
+   * Walk the given operator.
+   *
+   * @param nd operator being walked
+   */
+  @Override
+  public void walk(Node nd) throws SemanticException {
+    List<? extends Node> children = nd.getChildren();
+
+    // maintain the stack of operators encountered
+    opStack.push(nd);
+    Boolean skip = dispatchAndReturn(nd, opStack);
+
+    // save some positional state
+    Operator<? extends OperatorDesc> currentRoot = ctx.currentRootOperator;
+    Operator<? extends OperatorDesc> parentOfRoot = ctx.parentOfRoot;
+    BaseWork preceedingWork = ctx.preceedingWork;
+
+    if (skip == null || !skip) {
+      // move all the children to the front of queue
+      for (Node ch : children) {
+
+        // and restore the state before walking each child
+        ctx.currentRootOperator = currentRoot;
+        ctx.parentOfRoot = parentOfRoot;
+        ctx.preceedingWork = preceedingWork;
+
+        walk(ch);
+      }
+    }
+
+    // done with this operator
+    opStack.pop();
+  }
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1549374&r1=1549373&r2=1549374&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Mon Dec
 9 03:00:13 2013
@@ -34,12 +34,10 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.ForwardOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -113,12 +111,8 @@ public class TezCompiler extends TaskCom
     ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
     GenTezWork genTezWork = new GenTezWork();
 
-    // Sequence of TableScan operators to be walked
-    Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
-    deque.addAll(pCtx.getTopOps().values());
-
     GenTezProcContext procCtx = new GenTezProcContext(
-        conf, tempParseContext, mvTask, rootTasks, inputs, outputs, deque);
+        conf, tempParseContext, mvTask, rootTasks, inputs, outputs);
 
     // create a walker which walks the tree in a DFS manner while maintaining
     // the operator stack.
@@ -147,46 +141,12 @@ public class TezCompiler extends TaskCom
       }
     });
 
-    opRules.put(new RuleRegExp("Setup table scan",
-        TableScanOperator.getOperatorName() + "%"), new NodeProcessor()
-    {
-      @Override
-      public Object process(Node n, Stack<Node> s,
-          NodeProcessorCtx procCtx, Object... os) throws SemanticException {
-        GenTezProcContext context = (GenTezProcContext) procCtx;
-        TableScanOperator tableScan = (TableScanOperator) n;
-        LOG.debug("TableScan operator ("+tableScan
-            +"). Number of branches: "+tableScan.getNumChild());
-        context.lastRootOfMultiChildOperator.push(tableScan);
-        context.currentBranchCount.push(tableScan.getNumChild());
-        context.lastWorkForMultiChildOperator.push(null);
-        return null;
-      }
-    });
-
-    opRules.put(new RuleRegExp("Handle Forward opertor",
-        ForwardOperator.getOperatorName() + "%"), new NodeProcessor()
-    {
-      @Override
-      public Object process(Node n, Stack<Node> s,
-          NodeProcessorCtx procCtx, Object... os) throws SemanticException {
-        GenTezProcContext context = (GenTezProcContext) procCtx;
-        ForwardOperator forward = (ForwardOperator) n;
-        LOG.debug("Forward operator ("+forward+
-            "). Number of branches: "+forward.getNumChild());
-        context.lastRootOfMultiChildOperator.push(context.currentRootOperator);
-        context.currentBranchCount.push(forward.getNumChild());
-        context.lastWorkForMultiChildOperator.push(null);
-        return null;
-      }
-    });
-
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
     List<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pCtx.getTopOps().values());
-    GraphWalker ogw = new TezWalker(disp);
+    GraphWalker ogw = new GenTezWorkWalker(disp, procCtx);
     ogw.startWalking(topNodes, null);
   }
 



Mime
View raw message