hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1612125 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/ optimizer/spark/ parse/ parse/spark/ plan/
Date Sun, 20 Jul 2014 17:33:06 GMT
Author: xuefu
Date: Sun Jul 20 17:33:05 2014
New Revision: 1612125

URL: http://svn.apache.org/r1612125
Log:
HIVE-7331: Spark compiler ground work

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompilerOld.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
Removed:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1612125&r1=1612124&r2=1612125&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Sun Jul 20 17:33:05 2014
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.Fu
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
@@ -101,6 +103,7 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<IndexMetadataChangeWork>(IndexMetadataChangeWork.class,
         IndexMetadataChangeTask.class));
     taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class));
+    taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class));
 
   }
 

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java?rev=1612125&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java Sun Jul 20 17:33:05 2014
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkProcContext;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+public class SparkReduceSinkMapJoinProc implements NodeProcessor {
+
+  protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
+
+  /* (non-Javadoc)
+   * This processor addresses the RS-MJ case that occurs in spark on the small/hash
+   * table side of things. The work that RS will be a part of must be connected 
+   * to the MJ work via be a broadcast edge.
+   * We should not walk down the tree when we encounter this pattern because:
+   * the type of work (map work or reduce work) needs to be determined
+   * on the basis of the big table side because it may be a mapwork (no need for shuffle)
+   * or reduce work.
+   */
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procContext, Object... nodeOutputs)
+      throws SemanticException {
+    GenSparkProcContext context = (GenSparkProcContext) procContext;
+    MapJoinOperator mapJoinOp = (MapJoinOperator)nd;
+
+    if (stack.size() < 2 || !(stack.get(stack.size() - 2) instanceof ReduceSinkOperator)) {
+      context.currentMapJoinOperators.add(mapJoinOp);
+      return null;
+    }
+
+    context.preceedingWork = null;
+    context.currentRootOperator = null;
+
+    ReduceSinkOperator parentRS = (ReduceSinkOperator)stack.get(stack.size() - 2);
+    // remove the tag for in-memory side of mapjoin
+    parentRS.getConf().setSkipTag(true);
+    parentRS.setSkipTag(true);
+    // remember the original parent list before we start modifying it.
+    if (!context.mapJoinParentMap.containsKey(mapJoinOp)) {
+      List<Operator<?>> parents = new ArrayList(mapJoinOp.getParentOperators());
+      context.mapJoinParentMap.put(mapJoinOp, parents);
+    }
+
+    List<BaseWork> mapJoinWork = null;
+
+    /*
+     *  if there was a pre-existing work generated for the big-table mapjoin side,
+     *  we need to hook the work generated for the RS (associated with the RS-MJ pattern)
+     *  with the pre-existing work.
+     *
+     *  Otherwise, we need to associate that the mapjoin op
+     *  to be linked to the RS work (associated with the RS-MJ pattern).
+     *
+     */
+    mapJoinWork = context.mapJoinWorkMap.get(mapJoinOp);
+    BaseWork parentWork;
+    if (context.unionWorkMap.containsKey(parentRS)) {
+      parentWork = context.unionWorkMap.get(parentRS);
+    } else {
+      assert context.childToWorkMap.get(parentRS).size() == 1;
+      parentWork = context.childToWorkMap.get(parentRS).get(0);
+    }
+
+    // set the link between mapjoin and parent vertex
+    int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS);
+    if (pos == -1) {
+      throw new SemanticException("Cannot find position of parent in mapjoin");
+    }
+    LOG.debug("Mapjoin "+mapJoinOp+", pos: "+pos+" --> "+parentWork.getName());
+    mapJoinOp.getConf().getParentToInput().put(pos, parentWork.getName());
+
+    int numBuckets = -1;
+/*    EdgeType edgeType = EdgeType.BROADCAST_EDGE;
+    if (mapJoinOp.getConf().isBucketMapJoin()) {
+
+      // disable auto parallelism for bucket map joins
+      parentRS.getConf().setAutoParallel(false);
+
+      numBuckets = (Integer) mapJoinOp.getConf().getBigTableBucketNumMapping().values().toArray()[0];
+      if (mapJoinOp.getConf().getCustomBucketMapJoin()) {
+        edgeType = EdgeType.CUSTOM_EDGE;
+      } else {
+        edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
+      }
+    }*/
+    SparkEdgeProperty edgeProp = new SparkEdgeProperty(0/*null, edgeType, numBuckets*/);
+
+    if (mapJoinWork != null) {
+      for (BaseWork myWork: mapJoinWork) {
+        // link the work with the work associated with the reduce sink that triggered this rule
+        SparkWork sparkWork = context.currentTask.getWork();
+        LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
+        sparkWork.connect(parentWork, myWork, edgeProp);
+        
+        ReduceSinkOperator r = null;
+        if (parentRS.getConf().getOutputName() != null) {
+          LOG.debug("Cloning reduce sink for multi-child broadcast edge");
+          // we've already set this one up. Need to clone for the next work.
+          r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
+              (ReduceSinkDesc) parentRS.getConf().clone(), parentRS.getParentOperators());
+          context.clonedReduceSinks.add(r);
+        } else {
+          r = parentRS;
+        }
+        // remember the output name of the reduce sink
+        r.getConf().setOutputName(myWork.getName());
+        context.connectedReduceSinks.add(r);
+      }
+    }
+
+    // remember in case we need to connect additional work later
+    Map<BaseWork, SparkEdgeProperty> linkWorkMap = null;
+    if (context.linkOpWithWorkMap.containsKey(mapJoinOp)) {
+      linkWorkMap = context.linkOpWithWorkMap.get(mapJoinOp);
+    } else {
+      linkWorkMap = new HashMap<BaseWork, SparkEdgeProperty>();
+    }
+    linkWorkMap.put(parentWork, edgeProp);
+    context.linkOpWithWorkMap.put(mapJoinOp, linkWorkMap);
+    
+    List<ReduceSinkOperator> reduceSinks 
+      = context.linkWorkWithReduceSinkMap.get(parentWork);
+    if (reduceSinks == null) {
+      reduceSinks = new ArrayList<ReduceSinkOperator>();
+    }
+    reduceSinks.add(parentRS);
+    context.linkWorkWithReduceSinkMap.put(parentWork, reduceSinks);
+
+    // create the dummy operators
+    List<Operator<?>> dummyOperators = new ArrayList<Operator<?>>();
+
+    // create an new operator: HashTableDummyOperator, which share the table desc
+    HashTableDummyDesc desc = new HashTableDummyDesc();
+    @SuppressWarnings("unchecked")
+    HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc);
+    TableDesc tbl;
+
+    // need to create the correct table descriptor for key/value
+    RowSchema rowSchema = parentRS.getParentOperators().get(0).getSchema();
+    tbl = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromRowSchema(rowSchema, ""));
+    dummyOp.getConf().setTbl(tbl);
+
+    Map<Byte, List<ExprNodeDesc>> keyExprMap = mapJoinOp.getConf().getKeys();
+    List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
+    StringBuffer keyOrder = new StringBuffer();
+    for (ExprNodeDesc k: keyCols) {
+      keyOrder.append("+");
+    }
+    TableDesc keyTableDesc = PlanUtils.getReduceKeyTableDesc(PlanUtils
+        .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"), keyOrder.toString());
+    mapJoinOp.getConf().setKeyTableDesc(keyTableDesc);
+
+    // let the dummy op be the parent of mapjoin op
+    mapJoinOp.replaceParent(parentRS, dummyOp);
+    List<Operator<? extends OperatorDesc>> dummyChildren =
+      new ArrayList<Operator<? extends OperatorDesc>>();
+    dummyChildren.add(mapJoinOp);
+    dummyOp.setChildOperators(dummyChildren);
+    dummyOperators.add(dummyOp);
+
+    // cut the operator tree so as to not retain connections from the parent RS downstream
+    List<Operator<? extends OperatorDesc>> childOperators = parentRS.getChildOperators();
+    int childIndex = childOperators.indexOf(mapJoinOp);
+    childOperators.remove(childIndex);
+
+    // the "work" needs to know about the dummy operators. They have to be separately initialized
+    // at task startup
+    if (mapJoinWork != null) {
+      for (BaseWork myWork: mapJoinWork) {
+        myWork.addDummyOp(dummyOp);
+      }
+    }
+    if (context.linkChildOpWithDummyOp.containsKey(mapJoinOp)) {
+      for (Operator<?> op: context.linkChildOpWithDummyOp.get(mapJoinOp)) {
+        dummyOperators.add(op);
+      }
+    }
+    context.linkChildOpWithDummyOp.put(mapJoinOp, dummyOperators);
+
+    return true;
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java?rev=1612125&r1=1612124&r2=1612125&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java Sun Jul 20 17:33:05 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.spark.SparkCompiler;
 
 /**
  * TaskCompilerFactory is a factory class to choose the appropriate

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1612125&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Sun Jul 20 17:33:05 2014
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+/**
+ * GenSparkProcContext maintains information about the tasks and operators 
+ * as we walk the operator tree to break them into SparkTasks.
+ * 
+ * Cloned from GenTezProcContext.
+ *
+ */
+public class GenSparkProcContext implements NodeProcessorCtx{
+  public final ParseContext parseContext;
+  public final HiveConf conf;
+  public final List<Task<MoveWork>> moveTask;
+
+  // rootTasks is the entry point for all generated tasks
+  public final List<Task<? extends Serializable>> rootTasks;
+
+  public final Set<ReadEntity> inputs;
+  public final Set<WriteEntity> outputs;
+
+  // holds the root of the operator tree we're currently processing
+  // this could be a table scan, but also a join, ptf, etc (i.e.:
+  // first operator of a reduce task.
+  public Operator<? extends OperatorDesc> currentRootOperator;
+
+  // this is the original parent of the currentRootOperator as we scan
+  // through the graph. A root operator might have multiple parents and
+  // we just use this one to remember where we came from in the current
+  // walk.
+  public Operator<? extends OperatorDesc> parentOfRoot;
+
+  // Spark task we're currently processing
+  public SparkTask currentTask;
+
+  // last work we've processed (in order to hook it up to the current
+  // one.
+  public BaseWork preceedingWork;
+
+  // map that keeps track of the last operator of a task to the work
+  // that follows it. This is used for connecting them later.
+  public final Map<Operator<?>, BaseWork> leafOperatorToFollowingWork;
+
+  // a map that keeps track of work that need to be linked while
+  // traversing an operator tree
+  public final Map<Operator<?>, Map<BaseWork,SparkEdgeProperty>> linkOpWithWorkMap;
+
+  // a map to keep track of what reduce sinks have to be hooked up to
+  // map join work
+  public final Map<BaseWork, List<ReduceSinkOperator>> linkWorkWithReduceSinkMap;
+
+  // map that says which mapjoin belongs to which work item
+  public final Map<MapJoinOperator, List<BaseWork>> mapJoinWorkMap;
+
+  // a map to keep track of which root generated which work
+  public final Map<Operator<?>, BaseWork> rootToWorkMap;
+
+  // a map to keep track of which child generated with work
+  public final Map<Operator<?>, List<BaseWork>> childToWorkMap;
+
+  // we need to keep the original list of operators in the map join to know
+  // what position in the mapjoin the different parent work items will have.
+  public final Map<MapJoinOperator, List<Operator<?>>> mapJoinParentMap;
+
+  // remember the dummy ops we created
+  public final Map<Operator<?>, List<Operator<?>>> linkChildOpWithDummyOp;
+
+  // used to group dependent tasks for multi table inserts
+  public final DependencyCollectionTask dependencyTask;
+
+  // remember map joins as we encounter them.
+  public final Set<MapJoinOperator> currentMapJoinOperators;
+
+  // used to hook up unions
+  public final Map<Operator<?>, BaseWork> unionWorkMap;
+  public final List<UnionOperator> currentUnionOperators;
+  public final Set<BaseWork> workWithUnionOperators;
+  public final Set<ReduceSinkOperator> clonedReduceSinks;
+
+  // we link filesink that will write to the same final location
+  public final Map<Path, List<FileSinkDesc>> linkedFileSinks;
+  public final Set<FileSinkOperator> fileSinkSet;
+
+  // remember which reducesinks we've already connected
+  public final Set<ReduceSinkOperator> connectedReduceSinks;
+
+  @SuppressWarnings("unchecked")
+  public GenSparkProcContext(HiveConf conf, ParseContext parseContext,
+      List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
+      Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
+    this.conf = conf;
+    this.parseContext = parseContext;
+    this.moveTask = moveTask;
+    this.rootTasks = rootTasks;
+    this.inputs = inputs;
+    this.outputs = outputs;
+    this.currentTask = (SparkTask) TaskFactory.get(
+         new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
+    this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, BaseWork>();
+    this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, SparkEdgeProperty>>();
+    this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>();
+    this.mapJoinWorkMap = new LinkedHashMap<MapJoinOperator, List<BaseWork>>();
+    this.rootToWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
+    this.childToWorkMap = new LinkedHashMap<Operator<?>, List<BaseWork>>();
+    this.mapJoinParentMap = new LinkedHashMap<MapJoinOperator, List<Operator<?>>>();
+    this.currentMapJoinOperators = new LinkedHashSet<MapJoinOperator>();
+    this.linkChildOpWithDummyOp = new LinkedHashMap<Operator<?>, List<Operator<?>>>();
+    this.dependencyTask = (DependencyCollectionTask)
+        TaskFactory.get(new DependencyCollectionWork(), conf);
+    this.unionWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
+    this.currentUnionOperators = new LinkedList<UnionOperator>();
+    this.workWithUnionOperators = new LinkedHashSet<BaseWork>();
+    this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
+    this.linkedFileSinks = new LinkedHashMap<Path, List<FileSinkDesc>>();
+    this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
+    this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
+
+    rootTasks.add(currentTask);
+  }
+
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1612125&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Sun Jul 20 17:33:05 2014
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
+
+/**
+ * GenSparkUtils is a collection of shared helper methods to produce SparkWork
+ * Cloned from GenTezUtils.
+ * TODO: need to make it fit to Spark
+ */
+public class GenSparkUtils {
+  private static final Log logger = LogFactory.getLog(GenSparkUtils.class.getName());
+
+  // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...)
+  private int sequenceNumber = 0;
+
+  // singleton
+  private static GenSparkUtils utils;
+
+  public static GenSparkUtils getUtils() {
+    if (utils == null) {
+      utils = new GenSparkUtils();
+    }
+    return utils;
+  }
+
+  protected GenSparkUtils() {
+  }
+
+  public void resetSequenceNumber() {
+    sequenceNumber = 0;
+  }
+
+  public UnionWork createUnionWork(GenSparkProcContext context, Operator<?> operator, SparkWork sparkWork) {
+    UnionWork unionWork = new UnionWork("Union "+ (++sequenceNumber));
+    context.unionWorkMap.put(operator, unionWork);
+    sparkWork.add(unionWork);
+    return unionWork;
+  }
+
+  public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root, SparkWork sparkWork) {
+    assert !root.getParentOperators().isEmpty();
+
+    boolean isAutoReduceParallelism =
+        context.conf.getBoolVar(HiveConf.ConfVars.TEZ_AUTO_REDUCER_PARALLELISM);
+
+    float maxPartitionFactor =
+        context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MAX_PARTITION_FACTOR);
+    float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR);
+    long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+
+    ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
+    logger.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
+    reduceWork.setReducer(root);
+    reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
+
+    // All parents should be reduce sinks. We pick the one we just walked
+    // to choose the number of reducers. In the join/union case they will
+    // all be -1. In sort/order case where it matters there will be only
+    // one parent.
+    assert context.parentOfRoot instanceof ReduceSinkOperator;
+    ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
+
+    reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
+
+    if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) {
+      reduceWork.setAutoReduceParallelism(true);
+
+      // configured limit for reducers
+      int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+
+      // min we allow spark to pick
+      int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() 
+        * minPartitionFactor));
+      minPartition = (minPartition > maxReducers) ? maxReducers : minPartition;
+
+      // max we allow spark to pick
+      int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); 
+      maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition;
+
+      reduceWork.setMinReduceTasks(minPartition);
+      reduceWork.setMaxReduceTasks(maxPartition);
+    }
+
+    setupReduceSink(context, reduceWork, reduceSink);
+
+    sparkWork.add(reduceWork);
+
+    SparkEdgeProperty edgeProp;
+    if (reduceWork.isAutoReduceParallelism()) {
+      edgeProp =
+          new SparkEdgeProperty(0);
+    } else {
+      edgeProp = new SparkEdgeProperty(0);
+    }
+
+    sparkWork.connect(
+        context.preceedingWork,
+        reduceWork, edgeProp);
+    context.connectedReduceSinks.add(reduceSink);
+
+    return reduceWork;
+  }
+
+  protected void setupReduceSink(GenSparkProcContext context, ReduceWork reduceWork,
+      ReduceSinkOperator reduceSink) {
+
+    logger.debug("Setting up reduce sink: " + reduceSink
+        + " with following reduce work: " + reduceWork.getName());
+
+    // need to fill in information about the key and value in the reducer
+    GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
+
+    // remember which parent belongs to which tag
+    reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
+         context.preceedingWork.getName());
+
+    // remember the output name of the reduce sink
+    reduceSink.getConf().setOutputName(reduceWork.getName());
+  }
+
+  public MapWork createMapWork(GenSparkProcContext context, Operator<?> root,
+      SparkWork sparkWork, PrunedPartitionList partitions) throws SemanticException {
+    assert root.getParentOperators().isEmpty();
+    MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
+    logger.debug("Adding map work (" + mapWork.getName() + ") for " + root);
+
+    // map work starts with table scan operators
+    assert root instanceof TableScanOperator;
+    String alias = ((TableScanOperator)root).getConf().getAlias();
+
+    setupMapWork(mapWork, context, partitions, root, alias);
+
+    // add new item to the Spark work
+    sparkWork.add(mapWork);
+
+    return mapWork;
+  }
+
+  // this method's main use is to help unit testing this class
+  protected void setupMapWork(MapWork mapWork, GenSparkProcContext context,
+      PrunedPartitionList partitions, Operator<? extends OperatorDesc> root,
+      String alias) throws SemanticException {
+    // All the setup is done in GenMapRedUtils
+    GenMapRedUtils.setMapWork(mapWork, context.parseContext,
+        context.inputs, partitions, root, alias, context.conf, false);
+  }
+
+  // removes any union operator and clones the plan
+  public void removeUnionOperators(Configuration conf, GenSparkProcContext context,
+      BaseWork work)
+    throws SemanticException {
+
+    Set<Operator<?>> roots = work.getAllRootOperators();
+    if (work.getDummyOps() != null) {
+      roots.addAll(work.getDummyOps());
+    }
+
+    // need to clone the plan.
+    Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+
+    // we're cloning the operator plan but we're retaining the original work. That means
+    // that root operators have to be replaced with the cloned ops. The replacement map
+    // tells you what that mapping is.
+    Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>();
+
+    // there's some special handling for dummyOps required. Mapjoins won't be properly
+    // initialized if their dummy parents aren't initialized. Since we cloned the plan
+    // we need to replace the dummy operators in the work with the cloned ones.
+    List<HashTableDummyOperator> dummyOps = new LinkedList<HashTableDummyOperator>();
+
+    Iterator<Operator<?>> it = newRoots.iterator();
+    for (Operator<?> orig: roots) {
+      Operator<?> newRoot = it.next();
+      if (newRoot instanceof HashTableDummyOperator) {
+        dummyOps.add((HashTableDummyOperator)newRoot);
+        it.remove();
+      } else {
+        replacementMap.put(orig,newRoot);
+      }
+    }
+
+    // now we remove all the unions. we throw away any branch that's not reachable from
+    // the current set of roots. The reason is that those branches will be handled in
+    // different tasks.
+    Deque<Operator<?>> operators = new LinkedList<Operator<?>>();
+    operators.addAll(newRoots);
+
+    Set<Operator<?>> seen = new HashSet<Operator<?>>();
+
+    while(!operators.isEmpty()) {
+      Operator<?> current = operators.pop();
+      seen.add(current);
+
+      if (current instanceof FileSinkOperator) {
+        FileSinkOperator fileSink = (FileSinkOperator)current;
+
+        // remember it for additional processing later
+        context.fileSinkSet.add(fileSink);
+
+        FileSinkDesc desc = fileSink.getConf();
+        Path path = desc.getDirName();
+        List<FileSinkDesc> linked;
+
+        if (!context.linkedFileSinks.containsKey(path)) {
+          linked = new ArrayList<FileSinkDesc>();
+          context.linkedFileSinks.put(path, linked);
+        }
+        linked = context.linkedFileSinks.get(path);
+        linked.add(desc);
+
+        desc.setDirName(new Path(path, ""+linked.size()));
+        desc.setLinkedFileSinkDesc(linked);
+      }
+
+      if (current instanceof UnionOperator) {
+        Operator<?> parent = null;
+        int count = 0;
+
+        for (Operator<?> op: current.getParentOperators()) {
+          if (seen.contains(op)) {
+            ++count;
+            parent = op;
+          }
+        }
+
+        // we should have been able to reach the union from only one side.
+        assert count <= 1;
+
+        if (parent == null) {
+          // root operator is union (can happen in reducers)
+          replacementMap.put(current, current.getChildOperators().get(0));
+        } else {
+          parent.removeChildAndAdoptItsChildren(current);
+        }
+      }
+
+      if (current instanceof FileSinkOperator
+          || current instanceof ReduceSinkOperator) {
+        current.setChildOperators(null);
+      } else {
+        operators.addAll(current.getChildOperators());
+      }
+    }
+    work.setDummyOps(dummyOps);
+    work.replaceRoots(replacementMap);
+  }
+
+  public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSink)
+      throws SemanticException {
+
+    ParseContext parseContext = context.parseContext;
+
+    boolean isInsertTable = // is INSERT OVERWRITE TABLE
+        GenMapRedUtils.isInsertInto(parseContext, fileSink);
+    HiveConf hconf = parseContext.getConf();
+
+    boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
+        hconf, fileSink, context.currentTask, isInsertTable);
+
+    Path finalName = GenMapRedUtils.createMoveTask(context.currentTask,
+        chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
+
+    if (chDir) {
+      // Merge the files in the destination table/partitions by creating Map-only merge job
+      // If underlying data is RCFile a RCFileBlockMerge task would be created.
+      logger.info("using CombineHiveInputformat for the merge job");
+      GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
+          context.dependencyTask, context.moveTask,
+          hconf, context.currentTask);
+    }
+
+    FetchTask fetchTask = parseContext.getFetchTask();
+    if (fetchTask != null && context.currentTask.getNumChild() == 0) {
+      if (fetchTask.isFetchFrom(fileSink.getConf())) {
+        context.currentTask.setFetchSource(true);
+      }
+    }
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1612125&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Sun Jul 20 17:33:05 2014
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
+
+/**
+ * GenSparkWork separates the operator tree into spark tasks.
+ * It is called once per leaf operator (operator that forces a new execution unit.)
+ * and break the operators into work and tasks along the way.
+ * 
+ * Cloned from GenTezWork.
+ * 
+ * TODO: need to go thru this to make it fit completely to Spark.
+ */
+public class GenSparkWork implements NodeProcessor {
+  static final private Log LOG = LogFactory.getLog(GenSparkWork.class.getName());
+
+  // instance of shared utils
+  private GenSparkUtils utils = null;
+
+  /**
+   * Constructor takes utils as parameter to facilitate testing
+   */
+  public GenSparkWork(GenSparkUtils utils) {
+    this.utils = utils;
+  }
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack,
+      NodeProcessorCtx procContext, Object... nodeOutputs) throws SemanticException {
+    GenSparkProcContext context = (GenSparkProcContext) procContext;
+
+    assert context != null && context.currentTask != null
+        && context.currentRootOperator != null;
+
+    // Operator is a file sink or reduce sink. Something that forces
+    // a new vertex.
+    Operator<?> operator = (Operator<?>) nd;
+
+    // root is the start of the operator pipeline we're currently
+    // packing into a vertex, typically a table scan, union or join
+    Operator<?> root = context.currentRootOperator;
+
+    LOG.debug("Root operator: " + root);
+    LOG.debug("Leaf operator: " + operator);
+
+    if (context.clonedReduceSinks.contains(operator)) {
+      // if we're visiting a terminal we've created ourselves,
+      // just skip and keep going
+      return null;
+    }
+
+    SparkWork sparkWork = context.currentTask.getWork();
+
+    // Right now the work graph is pretty simple. If there is no
+    // Preceding work we have a root and will generate a map
+    // vertex. If there is a preceding work we will generate
+    // a reduce vertex
+    BaseWork work;
+    if (context.rootToWorkMap.containsKey(root)) {
+      // having seen the root operator before means there was a branch in the
+      // operator graph. There's typically two reasons for that: a) mux/demux
+      // b) multi insert. Mux/Demux will hit the same leaf again, multi insert
+      // will result into a vertex with multiple FS or RS operators.
+
+      // At this point we don't have to do anything special in this case. Just
+      // run through the regular paces w/o creating a new task.
+      work = context.rootToWorkMap.get(root);
+    } else {
+      // create a new vertex
+      if (context.preceedingWork == null) {
+        work = utils.createMapWork(context, root, sparkWork, null);
+      } else {
+        work = utils.createReduceWork(context, root, sparkWork);
+      }
+      context.rootToWorkMap.put(root, work);
+    }
+
+    if (!context.childToWorkMap.containsKey(operator)) {
+      List<BaseWork> workItems = new LinkedList<BaseWork>();
+      workItems.add(work);
+      context.childToWorkMap.put(operator, workItems);
+    } else {
+      context.childToWorkMap.get(operator).add(work);
+    }
+
+    // remember which mapjoin operator links with which work
+    if (!context.currentMapJoinOperators.isEmpty()) {
+      for (MapJoinOperator mj: context.currentMapJoinOperators) {
+        LOG.debug("Processing map join: " + mj);
+        // remember the mapping in case we scan another branch of the
+        // mapjoin later
+        if (!context.mapJoinWorkMap.containsKey(mj)) {
+          List<BaseWork> workItems = new LinkedList<BaseWork>();
+          workItems.add(work);
+          context.mapJoinWorkMap.put(mj, workItems);
+        } else {
+          context.mapJoinWorkMap.get(mj).add(work);
+        }
+
+        /*
+         * this happens in case of map join operations.
+         * The tree looks like this:
+         *
+         *        RS <--- we are here perhaps
+         *        |
+         *     MapJoin
+         *     /     \
+         *   RS       TS
+         *  /
+         * TS
+         *
+         * If we are at the RS pointed above, and we may have already visited the
+         * RS following the TS, we have already generated work for the TS-RS.
+         * We need to hook the current work to this generated work.
+         */
+        if (context.linkOpWithWorkMap.containsKey(mj)) {
+          Map<BaseWork, SparkEdgeProperty> linkWorkMap = context.linkOpWithWorkMap.get(mj);
+          if (linkWorkMap != null) {
+            if (context.linkChildOpWithDummyOp.containsKey(mj)) {
+              for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(mj)) {
+                work.addDummyOp((HashTableDummyOperator) dummy);
+              }
+            }
+            for (Entry<BaseWork,SparkEdgeProperty> parentWorkMap : linkWorkMap.entrySet()) {
+              BaseWork parentWork = parentWorkMap.getKey();
+              LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
+              SparkEdgeProperty edgeProp = parentWorkMap.getValue();
+              sparkWork.connect(parentWork, work, edgeProp);
+
+              // need to set up output name for reduce sink now that we know the name
+              // of the downstream work
+              for (ReduceSinkOperator r:
+                     context.linkWorkWithReduceSinkMap.get(parentWork)) {
+                if (r.getConf().getOutputName() != null) {
+                  LOG.debug("Cloning reduce sink for multi-child broadcast edge");
+                  // we've already set this one up. Need to clone for the next work.
+                  r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
+                      (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators());
+                  context.clonedReduceSinks.add(r);
+                }
+                r.getConf().setOutputName(work.getName());
+                context.connectedReduceSinks.add(r);
+              }
+            }
+          }
+        }
+      }
+      // clear out the set. we don't need it anymore.
+      context.currentMapJoinOperators.clear();
+    }
+
+    // This is where we cut the tree as described above. We also remember that
+    // we might have to connect parent work with this work later.
+    for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
+      context.leafOperatorToFollowingWork.put(parent, work);
+      LOG.debug("Removing " + parent + " as parent from " + root);
+      root.removeParent(parent);
+    }
+
+    if (!context.currentUnionOperators.isEmpty()) {
+      // if there are union all operators we need to add the work to the set
+      // of union operators.
+
+      UnionWork unionWork;
+      if (context.unionWorkMap.containsKey(operator)) {
+        // we've seen this terminal before and have created a union work object.
+        // just need to add this work to it. There will be no children of this one
+        // since we've passed this operator before.
+        assert operator.getChildOperators().isEmpty();
+        unionWork = (UnionWork) context.unionWorkMap.get(operator);
+
+      } else {
+        // first time through. we need to create a union work object and add this
+        // work to it. Subsequent work should reference the union and not the actual
+        // work.
+        unionWork = utils.createUnionWork(context, operator, sparkWork);
+      }
+
+      // finally hook everything up
+      LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")");
+      SparkEdgeProperty edgeProp = new SparkEdgeProperty(0/*EdgeType.CONTAINS*/);
+      sparkWork.connect(unionWork, work, edgeProp);
+      unionWork.addUnionOperators(context.currentUnionOperators);
+      context.currentUnionOperators.clear();
+      context.workWithUnionOperators.add(work);
+      work = unionWork;
+    }
+
+    // We're scanning a tree from roots to leaf (this is not technically
+    // correct, demux and mux operators might form a diamond shape, but
+    // we will only scan one path and ignore the others, because the
+    // diamond shape is always contained in a single vertex). The scan
+    // is depth first and because we remove parents when we pack a pipeline
+    // into a vertex we will never visit any node twice. But because of that
+    // we might have a situation where we need to connect 'work' that comes after
+    // the 'work' we're currently looking at.
+    //
+    // Also note: the concept of leaf and root is reversed in hive for historical
+    // reasons. Roots are data sources, leaves are data sinks. I know.
+    if (context.leafOperatorToFollowingWork.containsKey(operator)) {
+
+      BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator);
+      long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+
+      LOG.debug("Second pass. Leaf operator: "+operator
+        +" has common downstream work:"+followingWork);
+
+      // need to add this branch to the key + value info
+      assert operator instanceof ReduceSinkOperator
+        && followingWork instanceof ReduceWork;
+      ReduceSinkOperator rs = (ReduceSinkOperator) operator;
+      ReduceWork rWork = (ReduceWork) followingWork;
+      GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
+
+      // remember which parent belongs to which tag
+      rWork.getTagToInput().put(rs.getConf().getTag(), work.getName());
+
+      // remember the output name of the reduce sink
+      rs.getConf().setOutputName(rWork.getName());
+
+      if (!context.connectedReduceSinks.contains(rs)) {
+        // add dependency between the two work items
+        SparkEdgeProperty edgeProp;
+        if (rWork.isAutoReduceParallelism()) {
+          edgeProp =
+              new SparkEdgeProperty(0/*context.conf, EdgeType.SIMPLE_EDGE, true,
+                  rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer*/);
+        } else {
+          edgeProp = new SparkEdgeProperty(0/*EdgeType.SIMPLE_EDGE*/);
+        }
+        sparkWork.connect(work, rWork, edgeProp);
+        context.connectedReduceSinks.add(rs);
+      }
+    } else {
+      LOG.debug("First pass. Leaf operator: "+operator);
+    }
+
+    // No children means we're at the bottom. If there are more operators to scan
+    // the next item will be a new root.
+    if (!operator.getChildOperators().isEmpty()) {
+      assert operator.getChildOperators().size() == 1;
+      context.parentOfRoot = operator;
+      context.currentRootOperator = operator.getChildOperators().get(0);
+      context.preceedingWork = work;
+    }
+
+    return null;
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java?rev=1612125&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java Sun Jul 20 17:33:05 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/**
+ * Walks the operator tree in DFS fashion.
+ * 
+ * Cloned from GenTezWorkWarlker.
+ */
+public class GenSparkWorkWalker extends DefaultGraphWalker {
+  private final GenSparkProcContext ctx;
+
+  /**
+   * constructor of the walker - the dispatcher is passed.
+   *
+   * @param disp the dispatcher to be called for each node visited
+   * @param ctx the context where we'll set the current root operator
+   *
+   */
+  public GenSparkWorkWalker(Dispatcher disp, GenSparkProcContext ctx) {
+    super(disp);
+    this.ctx = ctx;
+  }
+
+  private void setRoot(Node nd) {
+    ctx.currentRootOperator = (Operator<? extends OperatorDesc>) nd;
+    ctx.preceedingWork = null;
+    ctx.parentOfRoot = null;
+  }
+
+  /**
+   * starting point for walking.
+   *
+   * @throws SemanticException
+   */
+  @Override
+  public void startWalking(Collection<Node> startNodes, HashMap<Node, Object> nodeOutput)
+      throws SemanticException {
+    toWalk.addAll(startNodes);
+    while (toWalk.size() > 0) {
+      Node nd = toWalk.remove(0);
+      setRoot(nd);
+      walk(nd);
+      if (nodeOutput != null) {
+        nodeOutput.put(nd, retMap.get(nd));
+      }
+    }
+  }
+
+  /**
+   * Walk the given operator.
+   *
+   * @param nd operator being walked
+   */
+  @Override
+  public void walk(Node nd) throws SemanticException {
+    List<? extends Node> children = nd.getChildren();
+
+    // maintain the stack of operators encountered
+    opStack.push(nd);
+    Boolean skip = dispatchAndReturn(nd, opStack);
+
+    // save some positional state
+    Operator<? extends OperatorDesc> currentRoot = ctx.currentRootOperator;
+    Operator<? extends OperatorDesc> parentOfRoot = ctx.parentOfRoot;
+    BaseWork preceedingWork = ctx.preceedingWork;
+
+    if (skip == null || !skip) {
+      // move all the children to the front of queue
+      for (Node ch : children) {
+
+        // and restore the state before walking each child
+        ctx.currentRootOperator = currentRoot;
+        ctx.parentOfRoot = parentOfRoot;
+        ctx.preceedingWork = preceedingWork;
+
+        walk(ch);
+      }
+    }
+
+    // done with this operator
+    opStack.pop();
+  }
+
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1612125&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Sun Jul 20 17:33:05 2014
@@ -0,0 +1,254 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
+import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TaskCompiler;
+
+/**
+ * SparkCompiler translates the operator plan into SparkTasks.
+ * 
+ * Pretty much cloned from TezCompiler.
+ * 
+ * TODO: need to complete and make it fit to Spark.
+ */
+public class SparkCompiler extends TaskCompiler {
+  private static final Log logger = LogFactory.getLog(SparkCompiler.class);
+
+  public SparkCompiler() {
+  }
+
+  @Override
+  public void init(HiveConf conf, LogHelper console, Hive db) {
+    super.init(conf, console, db);
+    
+//    TODO: Need to check if we require the use of recursive input dirs for union processing
+//    conf.setBoolean("mapred.input.dir.recursive", true);
+//    HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
+  }
+
+  @Override
+  protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
+      Set<WriteEntity> outputs) throws SemanticException {
+    // TODO: need to add spark specific optimization.
+/*
+    // Sequence of TableScan operators to be walked
+    Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+    deque.addAll(pCtx.getTopOps().values());
+
+    // Create the context for the walker
+    OptimizeSparkProcContext procCtx
+      = new OptimizeSparkProcContext(conf, pCtx, inputs, outputs, deque);
+
+    // create a walker which walks the tree in a DFS manner while maintaining
+    // the operator stack.
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"),
+        ReduceSinkOperator.getOperatorName() + "%"),
+        new SetReducerParallelism());
+
+    opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
+        JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    GraphWalker ogw = new ForwardWalker(disp);
+    ogw.startWalking(topNodes, null);
+*/
+  }
+
+  /**
+   * TODO: need to turn on rules that's commented out and add more if necessary.
+   */
+  @Override
+  protected void generateTaskTree(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
+      List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs)
+      throws SemanticException {
+    GenSparkUtils.getUtils().resetSequenceNumber();
+
+    ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
+    GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils());
+
+    GenSparkProcContext procCtx = new GenSparkProcContext(
+        conf, tempParseContext, mvTask, rootTasks, inputs, outputs);
+
+    // create a walker which walks the tree in a DFS manner while maintaining
+    // the operator stack. The dispatcher generates the plan from the operator tree
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("Split Work - ReduceSink",
+        ReduceSinkOperator.getOperatorName() + "%"), genSparkWork);
+
+//    opRules.put(new RuleRegExp("No more walking on ReduceSink-MapJoin",
+//        MapJoinOperator.getOperatorName() + "%"), new ReduceSinkMapJoinProc());
+
+    opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink",
+        FileSinkOperator.getOperatorName() + "%"),
+        new CompositeProcessor(new SparkFileSinkProcessor(), genSparkWork));
+
+//    opRules.put(new RuleRegExp("Handle Potential Analyze Command",
+//        TableScanOperator.getOperatorName() + "%"),
+//        new ProcessAnalyzeTable(GenSparkUtils.getUtils()));
+
+//    opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"),
+//        new NodeProcessor() {
+//      @Override
+//      public Object process(Node n, Stack<Node> s,
+//          NodeProcessorCtx procCtx, Object... os) throws SemanticException {
+//        GenSparkProcContext context = (GenSparkProcContext) procCtx;
+//        UnionOperator union = (UnionOperator) n;
+//
+//        // simply need to remember that we've seen a union.
+//        context.currentUnionOperators.add(union);
+//        return null;
+//      }
+//    });
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
+    ogw.startWalking(topNodes, null);
+
+    // we need to clone some operator plans and remove union operators still
+    for (BaseWork w: procCtx.workWithUnionOperators) {
+      GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w);
+    }
+
+    // finally make sure the file sink operators are set up right
+    for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
+      GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);
+    }
+  }
+
+  @Override
+  protected void setInputFormat(Task<? extends Serializable> task) {
+    if (task instanceof SparkTask) {
+      SparkWork work = ((SparkTask)task).getWork();
+      List<BaseWork> all = work.getAllWork();
+      for (BaseWork w: all) {
+        if (w instanceof MapWork) {
+          MapWork mapWork = (MapWork) w;
+          HashMap<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork();
+          if (!opMap.isEmpty()) {
+            for (Operator<? extends OperatorDesc> op : opMap.values()) {
+              setInputFormat(mapWork, op);
+            }
+          }
+        }
+      }
+    } else if (task instanceof ConditionalTask) {
+      List<Task<? extends Serializable>> listTasks
+        = ((ConditionalTask) task).getListTasks();
+      for (Task<? extends Serializable> tsk : listTasks) {
+        setInputFormat(tsk);
+      }
+    }
+
+    if (task.getChildTasks() != null) {
+      for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+        setInputFormat(childTask);
+      }
+    }
+  }
+
+  private void setInputFormat(MapWork work, Operator<? extends OperatorDesc> op) {
+    if (op.isUseBucketizedHiveInputFormat()) {
+      work.setUseBucketizedHiveInputFormat(true);
+      return;
+    }
+
+    if (op.getChildOperators() != null) {
+      for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+        setInputFormat(work, childOp);
+      }
+    }
+  }
+
+  @Override
+  protected void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
+      GlobalLimitCtx globalLimitCtx) throws SemanticException {
+    // currently all Spark work is on the cluster
+    return;
+  }
+
+  @Override
+  protected void optimizeTaskPlan(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
+      Context ctx) throws SemanticException {
+    PhysicalContext physicalCtx = new PhysicalContext(conf, pCtx, pCtx.getContext(), rootTasks,
+       pCtx.getFetchTask());
+
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CHECK_CROSS_PRODUCT)) {
+      physicalCtx = new CrossProductCheck().resolve(physicalCtx);
+    }
+
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+      (new Vectorizer()).resolve(physicalCtx);
+    }
+    if (!"none".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE))) {
+      (new StageIDsRearranger()).resolve(physicalCtx);
+    }
+    return;
+  }
+
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompilerOld.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompilerOld.java?rev=1612125&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompilerOld.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompilerOld.java Sun Jul 20 17:33:05 2014
@@ -0,0 +1,129 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
+import org.apache.hadoop.hive.ql.parse.MapReduceCompiler;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+/**
+ * SparkCompiler translates the operator plan into SparkTask.
+ * TODO: currently extending MapReduceCompiler in order to make POC work. It will
+ *       stand alone parallel to MapReduceCompiler.
+ * TODO: remove this class.
+ */
+public class SparkCompilerOld extends MapReduceCompiler {
+  private final Log logger = LogFactory.getLog(SparkCompilerOld.class);
+
+  public SparkCompilerOld() {
+  }
+
+  @Override
+  public void init(HiveConf conf, LogHelper console, Hive db) {
+    super.init(conf, console, db);
+
+    // Any Spark specific configuration
+    // We require the use of recursive input dirs for union processing
+//    conf.setBoolean("mapred.input.dir.recursive", true);
+//    HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
+  }
+
+  @Override
+  protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
+      Set<WriteEntity> outputs) throws SemanticException {
+    // TODO: add optimization that's related to Spark
+  }
+
+  private static int counter = 0;
+  
+  @Override
+  protected void generateTaskTree(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
+      List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs)
+      throws SemanticException {
+    super.generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs);
+
+    MapRedTask mrTask = (MapRedTask) rootTasks.get(0);
+    MapWork mapWork = mrTask.getWork().getMapWork();
+    ReduceWork redWork = mrTask.getWork().getReduceWork();
+    SparkWork sparkWork = new SparkWork("first spark #" + counter++);
+    sparkWork.setMapWork(mapWork);
+    if (redWork != null) {
+      sparkWork.setReduceWork(redWork);
+    }
+    SparkTask task = new SparkTask();
+    task.setWork(sparkWork);
+    task.setId(sparkWork.getName());
+    rootTasks.clear();
+    rootTasks.add(task);
+
+    // finally make sure the file sink operators are set up right
+    breakTaskTree(task);
+  }
+
+  private void breakTaskTree(Task<? extends Serializable> task) {
+    if (task instanceof SparkTask) {
+      SparkTask st = (SparkTask) task;
+      SparkWork sw = st.getWork();
+      MapWork mw = sw.getMapWork();
+      HashMap<String, Operator<? extends OperatorDesc>> opMap = mw.getAliasToWork();
+      if (!opMap.isEmpty()) {
+        for (Operator<? extends OperatorDesc> op : opMap.values()) {
+          breakOperatorTree(op);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
+      GlobalLimitCtx globalLimitCtx)
+      throws SemanticException {
+    // currently all Spark work is on the cluster
+    return;
+  }
+
+  @Override
+  protected void optimizeTaskPlan(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
+      Context ctx) throws SemanticException {
+  }
+
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java?rev=1612125&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java Sun Jul 20 17:33:05 2014
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
+ * Cloned from tez's FileSinkProcessor
+ */
+public class SparkFileSinkProcessor implements NodeProcessor {
+  private static final Log logger = LogFactory.getLog(SparkFileSinkProcessor.class.getName());
+
+  /*
+   * (non-Javadoc)
+   * we should ideally not modify the tree we traverse.
+   * However, since we need to walk the tree at any time when we modify the operator,
+   * we might as well do it here.
+   */
+  @Override
+  public Object process(Node nd, Stack<Node> stack,
+      NodeProcessorCtx procCtx, Object... nodeOutputs)
+      throws SemanticException {
+
+    GenSparkProcContext context = (GenSparkProcContext) procCtx;
+    FileSinkOperator fileSink = (FileSinkOperator) nd;
+
+    // just remember it for later processing
+    context.fileSinkSet.add(fileSink);
+    return true;
+  }
+
+}
\ No newline at end of file

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java?rev=1612125&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java Sun Jul 20 17:33:05 2014
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import java.lang.StringBuffer;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.QBParseInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.mapred.InputFormat;
+
+/**
+ * ProcessAnalyzeTable sets up work for the several variants of analyze table
+ * (normal, no scan, partial scan.) The plan at this point will be a single
+ * table scan operator.
+ * 
+ * TODO: cloned from tez ProcessAnalyzeTable. Need to make sure it fits to Spark.
+ */
+public class SparkProcessAnalyzeTable implements NodeProcessor {
+  private static final Log logger = LogFactory.getLog(SparkProcessAnalyzeTable.class.getName());
+
+  // shared plan utils for spark
+  private GenSparkUtils utils = null;
+
+  /**
+   * Injecting the utils in the constructor facilitates testing
+   */
+  public SparkProcessAnalyzeTable(GenSparkUtils utils) {
+    this.utils = utils;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object process(Node nd, Stack<Node> stack,
+      NodeProcessorCtx procContext, Object... nodeOutputs) throws SemanticException {
+    GenSparkProcContext context = (GenSparkProcContext) procContext;
+    
+    TableScanOperator tableScan = (TableScanOperator) nd;
+
+    ParseContext parseContext = context.parseContext;
+    Class<? extends InputFormat> inputFormat = parseContext.getTopToTable().get(tableScan)
+        .getInputFormatClass();
+    QB queryBlock = parseContext.getQB();
+    QBParseInfo parseInfo = parseContext.getQB().getParseInfo();
+    
+    if (parseInfo.isAnalyzeCommand()) {
+      assert tableScan.getChildOperators() == null || tableScan.getChildOperators().size() == 0;
+
+      String alias = null;
+      for (String a: parseContext.getTopOps().keySet()) {
+        if (tableScan == parseContext.getTopOps().get(a)) {
+          alias = a;
+        }
+      }
+      assert alias != null;
+
+      SparkWork sparkWork = context.currentTask.getWork();
+      boolean partialScan = parseInfo.isPartialScanAnalyzeCommand();
+      boolean noScan = parseInfo.isNoScanAnalyzeCommand();
+      if (inputFormat.equals(OrcInputFormat.class) && (noScan || partialScan)) {
+
+        // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan;
+        // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
+        // There will not be any Spark job above this task
+        StatsNoJobWork snjWork = new StatsNoJobWork(parseContext.getQB().getParseInfo().getTableSpec());
+        snjWork.setStatsReliable(parseContext.getConf().getBoolVar(
+            HiveConf.ConfVars.HIVE_STATS_RELIABLE));
+        Task<StatsNoJobWork> snjTask = TaskFactory.get(snjWork, parseContext.getConf());
+        snjTask.setParentTasks(null);
+        context.rootTasks.remove(context.currentTask);
+        context.rootTasks.add(snjTask);
+        return true;
+      } else {
+
+        // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS;
+        // The plan consists of a simple SparkTask followed by a StatsTask.
+        // The Spark task is just a simple TableScanOperator
+
+        StatsWork statsWork = new StatsWork(parseInfo.getTableSpec());
+        statsWork.setAggKey(tableScan.getConf().getStatsAggPrefix());
+        statsWork.setSourceTask(context.currentTask);
+        statsWork.setStatsReliable(parseContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
+        Task<StatsWork> statsTask = TaskFactory.get(statsWork, parseContext.getConf());
+        context.currentTask.addDependentTask(statsTask);
+
+        // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
+        // The plan consists of a StatsTask only.
+        if (parseInfo.isNoScanAnalyzeCommand()) {
+          statsTask.setParentTasks(null);
+          statsWork.setNoScanAnalyzeCommand(true);
+          context.rootTasks.remove(context.currentTask);
+          context.rootTasks.add(statsTask);
+        }
+
+        // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan;
+        if (parseInfo.isPartialScanAnalyzeCommand()) {
+          handlePartialScanCommand(tableScan, parseContext, parseInfo, statsWork, context, statsTask);
+        }
+
+        // NOTE: here we should use the new partition predicate pushdown API to get a list of pruned list,
+        // and pass it to setTaskPlan as the last parameter
+        Set<Partition> confirmedPartns = GenMapRedUtils.getConfirmedPartitionsForScan(parseInfo);
+        PrunedPartitionList partitions = null;
+        if (confirmedPartns.size() > 0) {
+          Table source = queryBlock.getMetaData().getTableForAlias(alias);
+          List<String> partCols = GenMapRedUtils.getPartitionColumns(parseInfo);
+          partitions = new PrunedPartitionList(source, confirmedPartns, partCols, false);
+        }
+
+        MapWork w = utils.createMapWork(context, tableScan, sparkWork, partitions);
+        w.setGatheringStats(true);
+        return true;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * handle partial scan command.
+   *
+   * It is composed of PartialScanTask followed by StatsTask.
+   */
+  private void handlePartialScanCommand(TableScanOperator tableScan, ParseContext parseContext, 
+      QBParseInfo parseInfo, StatsWork statsWork, GenSparkProcContext context,
+      Task<StatsWork> statsTask) throws SemanticException {
+    String aggregationKey = tableScan.getConf().getStatsAggPrefix();
+    StringBuffer aggregationKeyBuffer = new StringBuffer(aggregationKey);
+    List<Path> inputPaths = GenMapRedUtils.getInputPathsForPartialScan(parseInfo, aggregationKeyBuffer);
+    aggregationKey = aggregationKeyBuffer.toString();
+    
+    // scan work
+    PartialScanWork scanWork = new PartialScanWork(inputPaths);
+    scanWork.setMapperCannotSpanPartns(true);
+    scanWork.setAggKey(aggregationKey);
+
+    // stats work
+    statsWork.setPartialScanAnalyzeCommand(true);
+
+    // partial scan task
+    DriverContext driverCxt = new DriverContext();
+    Task<PartialScanWork> partialScanTask = TaskFactory.get(scanWork, parseContext.getConf());
+    partialScanTask.initialize(parseContext.getConf(), null, driverCxt);
+    partialScanTask.setWork(scanWork);
+    statsWork.setSourceTask(partialScanTask);
+
+    // task dependency
+    context.rootTasks.remove(context.currentTask);
+    context.rootTasks.add(partialScanTask);
+    partialScanTask.addDependentTask(statsTask);
+  }
+
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1612125&r1=1612124&r2=1612125&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Sun Jul 20 17:33:05 2014
@@ -25,38 +25,45 @@ public class SparkEdgeProperty {
   public static long SHUFFLE_GROUP = 1; // Shuffle, keys are coming together
   public static long SHUFFLE_SORT = 2;  // Shuffle, keys are sorted
 
-  private long value;
+  private long edgeType;
   
-  public SparkEdgeProperty(long value) {
-    this.value = value;
+  private int numPartitions;
+  
+  public SparkEdgeProperty(long edgeType, int numPartitions) {
+    this.edgeType = edgeType;
+    this.numPartitions = numPartitions;
   }
   
+  public SparkEdgeProperty(int edgeType) {
+    this.edgeType = edgeType;
+  }
+
   public boolean isShuffleNone() {
-    return value == SHUFFLE_NONE;
+    return edgeType == SHUFFLE_NONE;
   }
   
   public void setShuffleNone() {
-    value = SHUFFLE_NONE;
+    edgeType = SHUFFLE_NONE;
   }
 
   public boolean isShuffleGroup() {
-    return (value & SHUFFLE_GROUP) != 0;
+    return (edgeType & SHUFFLE_GROUP) != 0;
   }
   
   public void setShuffleGroup() {
-    value |= SHUFFLE_GROUP;
+    edgeType |= SHUFFLE_GROUP;
   }
   
   public boolean isShuffleSort() {
-    return (value & SHUFFLE_SORT) != 0;
+    return (edgeType & SHUFFLE_SORT) != 0;
   }
 
   public void setShuffleSort() {
-    value |= SHUFFLE_SORT;
+    edgeType |= SHUFFLE_SORT;
   }
   
-  public long getValue() {
-    return value;
+  public long getEdgeType() {
+    return edgeType;
   }
 
   @Explain(displayName = "Shuffle Type")
@@ -80,5 +87,13 @@ public class SparkEdgeProperty {
 
     return sb.toString();
   }
+
+  public int getNumPartitions() {
+    return numPartitions;
+  }
+
+  public void setNumPartitions(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
 }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1612125&r1=1612124&r2=1612125&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Sun Jul 20 17:33:05 2014
@@ -307,8 +307,10 @@ public class SparkWork extends AbstractO
   
   public ReduceWork getReduceWork() {
     Iterator<BaseWork> it = leaves.iterator();
-    if (it.hasNext())
-      return (ReduceWork)leaves.iterator().next();
+    if (it.hasNext()) {
+      BaseWork work = leaves.iterator().next();
+      return (work instanceof ReduceWork) ? (ReduceWork)work : null;
+    }
     return null;
   }
 



Mime
View raw message