hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1667456 [8/16] - in /hive/branches/llap: ./ beeline/src/java/org/apache/hive/beeline/ common/src/java/org/apache/hadoop/hive/conf/ contrib/src/test/queries/clientnegative/ contrib/src/test/queries/clientpositive/ contrib/src/test/results/c...
Date Wed, 18 Mar 2015 05:40:11 GMT
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java Wed Mar 18 05:40:07 2015
@@ -21,7 +21,9 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -169,7 +171,9 @@ public class MuxOperator extends Operato
   private transient long[] nextCntrs;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
+
     // A MuxOperator should only have a single child
     if (childOperatorsArray.length != 1) {
       throw new HiveException(
@@ -204,7 +208,7 @@ public class MuxOperator extends Operato
       cntrs[i] = 0;
       nextCntrs[i] = 1;
     }
-    initializeChildren(hconf);
+    return result;
   }
 
   /**
@@ -230,7 +234,7 @@ public class MuxOperator extends Operato
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (isLogInfoEnabled) {
       cntrs[tag]++;
       if (cntrs[tag] == nextCntrs[tag]) {
@@ -247,11 +251,11 @@ public class MuxOperator extends Operato
       } else {
         if (forward[tag]) {
           // No need to evaluate, just forward it.
-          child.processOp(row, tag);
+          child.process(row, tag);
         } else {
           // Call the corresponding handler to evaluate this row and
           // forward the result
-          child.processOp(handlers[tag].process(row), handlers[tag].getTag());
+          child.process(handlers[tag].process(row), handlers[tag].getTag());
         }
       }
     }
@@ -269,7 +273,7 @@ public class MuxOperator extends Operato
     // we cannot pass new tag to this method which is used to get
     // the old tag from the mapping of newTagToOldTag, we bypass
     // this method in MuxOperator and directly call process on children
-    // in processOp() method..
+    // in process() method..
   }
 
   @Override
@@ -308,7 +312,7 @@ public class MuxOperator extends Operato
   protected void closeOp(boolean abort) throws HiveException {
     if (isLogInfoEnabled) {
       for (int i = 0; i < numParents; i++) {
-	LOG.info(id + ", tag=" + i + ", forwarded " + cntrs[i] + " rows");
+        LOG.info(id + ", tag=" + i + ", forwarded " + cntrs[i] + " rows");
       }
     }
   }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java Wed Mar 18 05:40:07 2015
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 
 /**
@@ -32,9 +34,23 @@ public interface ObjectCache {
 
   /**
    * Retrieve object from cache.
+   *
+   * @param <T>
+   * @param key
+   * @param fn
+   *          function to generate the object if it's not there
+   * @return the last cached object with the key, null if none.
+   */
+  public <T> T retrieve(String key, Callable<T> fn) throws HiveException;
+
+  /**
+   * Retrieve object from cache asynchronously.
+   *
+   * @param <T>
    * @param key
-   * @param fn function to generate the object if it's not there
+   * @param fn
+   *          function to generate the object if it's not there
    * @return the last cached object with the key, null if none.
    */
-  public Object retrieve(String key, Callable<?> fn) throws HiveException;
+  public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws HiveException;
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Mar 18 05:40:07 2015
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -68,6 +69,7 @@ public abstract class Operator<T extends
   protected List<Operator<? extends OperatorDesc>> parentOperators;
   protected String operatorId;
   private transient ExecMapperContext execContext;
+  private transient boolean rootInitializeCalled = false;
 
   private static AtomicInteger seqId;
 
@@ -101,13 +103,13 @@ public abstract class Operator<T extends
   // dummy operator (for not increasing seqId)
   private Operator(String name) {
     id = name;
+    initOperatorId();
+    childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+    parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
   }
 
   public Operator() {
-    id = String.valueOf(seqId.getAndIncrement());
-    childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
-    parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
-    initOperatorId();
+    this(String.valueOf(seqId.getAndIncrement()));
   }
 
   public static void resetId() {
@@ -252,11 +254,6 @@ public abstract class Operator<T extends
   public void setReporter(Reporter rep) {
     reporter = rep;
 
-    // the collector is same across all operators
-    if (childOperators == null) {
-      return;
-    }
-
     for (Operator<? extends OperatorDesc> op : childOperators) {
       op.setReporter(rep);
     }
@@ -266,11 +263,6 @@ public abstract class Operator<T extends
   public void setOutputCollector(OutputCollector out) {
     this.out = out;
 
-    // the collector is same across all operators
-    if (childOperators == null) {
-      return;
-    }
-
     for (Operator<? extends OperatorDesc> op : childOperators) {
       op.setOutputCollector(out);
     }
@@ -282,10 +274,6 @@ public abstract class Operator<T extends
   public void setAlias(String alias) {
     this.alias = alias;
 
-    if (childOperators == null) {
-      return;
-    }
-
     for (Operator<? extends OperatorDesc> op : childOperators) {
       op.setAlias(alias);
     }
@@ -306,9 +294,6 @@ public abstract class Operator<T extends
    *         otherwise
    */
   protected boolean areAllParentsInitialized() {
-    if (parentOperators == null) {
-      return true;
-    }
     for (Operator<? extends OperatorDesc> parent : parentOperators) {
       if (parent == null) {
         //return true;
@@ -332,7 +317,7 @@ public abstract class Operator<T extends
    * @throws HiveException
    */
   @SuppressWarnings("unchecked")
-  public void initialize(Configuration hconf, ObjectInspector[] inputOIs)
+  public final void initialize(Configuration hconf, ObjectInspector[] inputOIs)
       throws HiveException {
     if (state == State.INIT) {
       return;
@@ -344,7 +329,7 @@ public abstract class Operator<T extends
     }
 
     if (isLogInfoEnabled) {
-      LOG.info("Initializing Self " + this);
+      LOG.info("Initializing operator " + this);
     }
 
     if (inputOIs != null) {
@@ -352,50 +337,69 @@ public abstract class Operator<T extends
     }
 
     // initialize structure to maintain child op info. operator tree changes
-    // while
-    // initializing so this need to be done here instead of initialize() method
-    if (childOperators != null && !childOperators.isEmpty()) {
-      childOperatorsArray = new Operator[childOperators.size()];
-      for (int i = 0; i < childOperatorsArray.length; i++) {
-        childOperatorsArray[i] = childOperators.get(i);
-      }
-      childOperatorsTag = new int[childOperatorsArray.length];
-      for (int i = 0; i < childOperatorsArray.length; i++) {
-        List<Operator<? extends OperatorDesc>> parentOperators = childOperatorsArray[i]
-            .getParentOperators();
-        if (parentOperators == null) {
-          throw new HiveException("Hive internal error: parent is null in "
-              + childOperatorsArray[i].getClass() + "!");
-        }
-        childOperatorsTag[i] = parentOperators.indexOf(this);
-        if (childOperatorsTag[i] == -1) {
-          throw new HiveException(
-              "Hive internal error: cannot find parent in the child operator!");
-        }
+    // while initializing so this need to be done here instead of constructor
+    childOperatorsArray = new Operator[childOperators.size()];
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      childOperatorsArray[i] = childOperators.get(i);
+    }
+    childOperatorsTag = new int[childOperatorsArray.length];
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      List<Operator<? extends OperatorDesc>> parentOperators =
+          childOperatorsArray[i].getParentOperators();
+      childOperatorsTag[i] = parentOperators.indexOf(this);
+      if (childOperatorsTag[i] == -1) {
+        throw new HiveException("Hive internal error: cannot find parent in the child operator!");
       }
     }
 
     if (inputObjInspectors.length == 0) {
       throw new HiveException("Internal Error during operator initialization.");
     }
+
     // derived classes can set this to different object if needed
     outputObjInspector = inputObjInspectors[0];
 
-    //pass the exec context to child operators
-    passExecContext(this.execContext);
+    Collection<Future<?>> asyncInitOperations = initializeOp(hconf);
 
-    initializeOp(hconf);
-
-    // sanity check
-    if (childOperatorsArray == null
-        && !(childOperators == null || childOperators.isEmpty())) {
-      throw new HiveException(
-          "Internal Hive error during operator initialization.");
+    // sanity checks
+    if (!rootInitializeCalled
+	|| asyncInitOperations == null
+	|| childOperatorsArray.length != childOperators.size()) {
+      throw new AssertionError("Internal error during operator initialization");
     }
 
     if (isLogInfoEnabled) {
       LOG.info("Initialization Done " + id + " " + getName());
     }
+
+    initializeChildren(hconf);
+
+    // let's wait on the async ops before continuing
+    completeInitialization(asyncInitOperations);
+  }
+
+  private void completeInitialization(Collection<Future<?>> fs) throws HiveException {
+    Object[] os = new Object[fs.size()];
+    int i = 0;
+    for (Future<?> f : fs) {
+      try {
+        os[i++] = f.get();
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+    }
+    completeInitializationOp(os);
+  }
+
+  /**
+   * This metod can be used to retrieve the results from async operations
+   * started at init time - before the operator pipeline is started.
+   *
+   * @param os
+   * @throws HiveException
+   */
+  protected void completeInitializationOp(Object[] os) throws HiveException {
+    // no-op default
   }
 
   public void initializeLocalWork(Configuration hconf) throws HiveException {
@@ -410,8 +414,9 @@ public abstract class Operator<T extends
   /**
    * Operator specific initialization.
    */
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    initializeChildren(hconf);
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    rootInitializeCalled = true;
+    return new ArrayList<Future<?>>();
   }
 
   /**
@@ -430,8 +435,7 @@ public abstract class Operator<T extends
       LOG.info("Initializing children of " + id + " " + getName());
     }
     for (int i = 0; i < childOperatorsArray.length; i++) {
-      childOperatorsArray[i].initialize(hconf, outputObjInspector,
-          childOperatorsTag[i]);
+      childOperatorsArray[i].initialize(hconf, outputObjInspector, childOperatorsTag[i]);
       if (reporter != null) {
         childOperatorsArray[i].setReporter(reporter);
       }
@@ -443,10 +447,8 @@ public abstract class Operator<T extends
    */
   public void passExecContext(ExecMapperContext execContext) {
     this.setExecContext(execContext);
-    if(childOperators != null) {
-      for (int i = 0; i < childOperators.size(); i++) {
+    for (int i = 0; i < childOperators.size(); i++) {
         childOperators.get(i).passExecContext(execContext);
-      }
     }
   }
 
@@ -501,7 +503,7 @@ public abstract class Operator<T extends
    *          Rows with the same tag should have exactly the same rowInspector
    *          all the time.
    */
-  public abstract void processOp(Object row, int tag) throws HiveException;
+  public abstract void process(Object row, int tag) throws HiveException;
 
   protected final void defaultStartGroup() throws HiveException {
     if (isLogDebugEnabled) {
@@ -598,7 +600,7 @@ public abstract class Operator<T extends
     // check if all parents are finished
     if (!allInitializedParentsAreClosed()) {
       if (isLogDebugEnabled) {
-	LOG.debug("Not all parent operators are closed. Not closing.");
+        LOG.debug("Not all parent operators are closed. Not closing.");
       }
       return;
     }
@@ -822,7 +824,7 @@ public abstract class Operator<T extends
   protected void forward(Object row, ObjectInspector rowInspector)
       throws HiveException {
 
-    if ((childOperatorsArray == null) || (getDone())) {
+    if (getDone()) {
       return;
     }
 
@@ -832,12 +834,12 @@ public abstract class Operator<T extends
       if (o.getDone()) {
         childrenDone++;
       } else {
-        o.processOp(row, childOperatorsTag[i]);
+        o.process(row, childOperatorsTag[i]);
       }
     }
 
     // if all children are done, this operator is also done
-    if (childrenDone == childOperatorsArray.length) {
+    if (childrenDone != 0 && childrenDone == childOperatorsArray.length) {
       setDone(true);
     }
   }
@@ -878,7 +880,7 @@ public abstract class Operator<T extends
   public void logStats() {
     if (isLogInfoEnabled) {
       for (String e : statsMap.keySet()) {
-	LOG.info(e.toString() + ":" + statsMap.get(e).toString());
+        LOG.info(e.toString() + ":" + statsMap.get(e).toString());
       }
     }
   }
@@ -969,7 +971,7 @@ public abstract class Operator<T extends
    * Initialize an array of ExprNodeEvaluator and return the result
    * ObjectInspectors.
    */
-  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals,
+  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator<?>[] evals,
       ObjectInspector rowInspector) throws HiveException {
     ObjectInspector[] result = new ObjectInspector[evals.length];
     for (int i = 0; i < evals.length; i++) {
@@ -982,7 +984,7 @@ public abstract class Operator<T extends
    * Initialize an array of ExprNodeEvaluator from start, for specified length
    * and return the result ObjectInspectors.
    */
-  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals,
+  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator<?>[] evals,
       int start, int length,
       ObjectInspector rowInspector) throws HiveException {
     ObjectInspector[] result = new ObjectInspector[length];
@@ -997,7 +999,7 @@ public abstract class Operator<T extends
    * StructObjectInspector with integer field names.
    */
   protected static StructObjectInspector initEvaluatorsAndReturnStruct(
-      ExprNodeEvaluator[] evals, List<String> outputColName,
+      ExprNodeEvaluator<?>[] evals, List<String> outputColName,
       ObjectInspector rowInspector) throws HiveException {
     ObjectInspector[] fieldObjectInspectors = initEvaluators(evals,
         rowInspector);
@@ -1059,12 +1061,6 @@ public abstract class Operator<T extends
 
   public void setExecContext(ExecMapperContext execContext) {
     this.execContext = execContext;
-    if(this.childOperators != null) {
-      for (int i = 0; i<this.childOperators.size();i++) {
-        Operator<? extends OperatorDesc> op = this.childOperators.get(i);
-        op.setExecContext(execContext);
-      }
-    }
   }
 
   // The input file has changed - every operator can invoke specific action
@@ -1128,6 +1124,7 @@ public abstract class Operator<T extends
    * @return Cloned operator
    * @throws CloneNotSupportedException
    */
+  @SuppressWarnings("unchecked")
   public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
     T descClone = (T) conf.clone();
     Operator<? extends OperatorDesc> ret =
@@ -1148,11 +1145,6 @@ public abstract class Operator<T extends
       throws CloneNotSupportedException {
     Operator<? extends OperatorDesc> newOp = this.cloneOp();
     newOp.setParentOperators(this.parentOperators);
-    // Fix parent in all children
-    if (this.getChildOperators() == null) {
-      newOp.setChildOperators(null);
-      return newOp;
-    }
     List<Operator<? extends OperatorDesc>> newChildren =
         new ArrayList<Operator<? extends OperatorDesc>>();
 
@@ -1301,12 +1293,13 @@ public abstract class Operator<T extends
     if (conf != null) {
       return conf.getStatistics();
     }
+
     return null;
   }
 
   public OpTraits getOpTraits() {
     if (conf != null) {
-      return conf.getOpTraits();
+      return conf.getTraits();
     }
 
     return null;
@@ -1314,36 +1307,48 @@ public abstract class Operator<T extends
 
   public void setOpTraits(OpTraits metaInfo) {
     if (isLogDebugEnabled) {
-      LOG.debug("Setting traits ("+metaInfo+") on "+this);
+      LOG.debug("Setting traits (" + metaInfo + ") on " + this);
     }
     if (conf != null) {
-      conf.setOpTraits(metaInfo);
+      conf.setTraits(metaInfo);
     } else {
-      LOG.warn("Cannot set traits when there's no descriptor: "+this);
+      LOG.warn("Cannot set traits when there's no descriptor: " + this);
     }
   }
 
   public void setStatistics(Statistics stats) {
     if (isLogDebugEnabled) {
-      LOG.debug("Setting stats ("+stats+") on "+this);
+      LOG.debug("Setting stats (" + stats + ") on " + this);
     }
     if (conf != null) {
       conf.setStatistics(stats);
     } else {
-      LOG.warn("Cannot set stats when there's no descriptor: "+this);
+      LOG.warn("Cannot set stats when there's no descriptor: " + this);
     }
   }
 
+  @SuppressWarnings("rawtypes")
   public static Operator createDummy() {
     return new DummyOperator();
   }
 
+  @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
   private static class DummyOperator extends Operator {
     public DummyOperator() { super("dummy"); }
+
     @Override
-    public void processOp(Object row, int tag) { }
+    public void process(Object row, int tag) {
+    }
+
     @Override
-    public OperatorType getType() { return null; }
+    public OperatorType getType() {
+      return null;
+    }
+
+    @Override
+    protected Collection<Future<?>> initializeOp(Configuration conf) {
+      return childOperators;
+    }
   }
 
   public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Mar 18 05:40:07 2015
@@ -18,6 +18,12 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
@@ -62,16 +68,13 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
 import org.apache.hadoop.hive.ql.plan.UnionDesc;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * OperatorFactory.
  *
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public final class OperatorFactory {
+  protected static transient final Log LOG = LogFactory.getLog(OperatorFactory.class);
   private static final List<OpTuple> opvec;
   private static final List<OpTuple> vectorOpvec;
 
@@ -227,9 +230,6 @@ public final class OperatorFactory {
     // Add this parent to the children
     for (Operator<? extends OperatorDesc> op : oplist) {
       List<Operator<? extends OperatorDesc>> parents = op.getParentOperators();
-      if (parents == null) {
-        parents = new ArrayList<Operator<? extends OperatorDesc>>();
-      }
       parents.add(ret);
       op.setParentOperators(parents);
     }
@@ -259,9 +259,6 @@ public final class OperatorFactory {
     // Add the new operator as child of each of the passed in operators
     for (Operator op : oplist) {
       List<Operator> children = op.getChildOperators();
-      if (children == null) {
-        children = new ArrayList<Operator>();
-      }
       children.add(ret);
       op.setChildOperators(children);
     }
@@ -286,17 +283,13 @@ public final class OperatorFactory {
     Operator<T> ret = get((Class<T>) conf.getClass());
     ret.setConf(conf);
     if (oplist.size() == 0) {
-      return (ret);
+      return ret;
     }
 
     // Add the new operator as child of each of the passed in operators
     for (Operator op : oplist) {
       List<Operator> children = op.getChildOperators();
-      if (children == null) {
-        children = new ArrayList<Operator>();
-      }
       children.add(ret);
-      op.setChildOperators(children);
     }
 
     // add parents for the newly created operator
@@ -308,7 +301,7 @@ public final class OperatorFactory {
 
     ret.setParentOperators(parent);
 
-    return (ret);
+    return ret;
   }
 
   /**
@@ -318,7 +311,7 @@ public final class OperatorFactory {
       RowSchema rwsch, Operator... oplist) {
     Operator<T> ret = getAndMakeChild(conf, oplist);
     ret.setSchema(rwsch);
-    return (ret);
+    return ret;
   }
 
   /**

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Wed Mar 18 05:40:07 2015
@@ -122,7 +122,7 @@ public class OperatorUtils {
       if(op.getName().equals(ReduceSinkOperator.getOperatorName())) {
         ReduceSinkOperator rs = ((ReduceSinkOperator)op);
         if (outMap.containsKey(rs.getConf().getOutputName())) {
-          LOG.info("Setting output collector: " + rs + " --> " 
+          LOG.info("Setting output collector: " + rs + " --> "
             + rs.getConf().getOutputName());
           rs.setOutputCollector(outMap.get(rs.getConf().getOutputName()));
         }
@@ -234,9 +234,7 @@ public class OperatorUtils {
             resultMap.put(clazz, op);
           }
         }
-        if (op.getChildOperators() != null) {
-          allChildren.addAll(op.getChildOperators());
-        }
+        allChildren.addAll(op.getChildOperators());
       }
       ops = allChildren;
     }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java Wed Mar 18 05:40:07 2015
@@ -56,7 +56,7 @@ public class OrcFileMergeOperator extend
   private FSDataInputStream fdis;
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     Object[] keyValue = (Object[]) row;
     processKeyValuePairs(keyValue[0], keyValue[1]);
   }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Wed Mar 18 05:40:07 2015
@@ -19,12 +19,13 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Stack;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -66,7 +67,8 @@ public class PTFOperator extends Operato
    * 4. Create input partition to store rows coming from previous operator
    */
   @Override
-  protected void initializeOp(Configuration jobConf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration jobConf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(jobConf);
     hiveConf = jobConf;
     isMapOperator = conf.isMapSide();
 
@@ -84,8 +86,7 @@ public class PTFOperator extends Operato
     ptfInvocation = setupChain();
     ptfInvocation.initializeStreaming(jobConf, isMapOperator);
     firstMapRow = true;
-
-    super.initializeOp(jobConf);
+    return result;
   }
 
   @Override
@@ -96,7 +97,7 @@ public class PTFOperator extends Operato
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (!isMapOperator ) {
       /*
        * checkif current row belongs to the current accumulated Partition:

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java Wed Mar 18 05:40:07 2015
@@ -43,7 +43,7 @@ public class RCFileMergeOperator
   int columnNumber = 0;
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     Object[] keyValue = (Object[]) row;
     processKeyValuePairs(keyValue[0], keyValue[1]);
   }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Wed Mar 18 05:40:07 2015
@@ -18,12 +18,16 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -48,14 +52,12 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.hash.MurmurHash;
 
-import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
-
 /**
  * Reduce Sink Operator sends output to the reduce stage.
  **/
@@ -153,7 +155,8 @@ public class ReduceSinkOperator extends
   private final transient LongWritable recordCounter = new LongWritable();
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     try {
 
       numRows = 0;
@@ -237,12 +240,12 @@ public class ReduceSinkOperator extends
       useUniformHash = conf.getReducerTraits().contains(UNIFORM);
 
       firstRow = true;
-      initializeChildren(hconf);
     } catch (Exception e) {
       String msg = "Error initializing ReduceSinkOperator: " + e.getMessage();
       LOG.error(msg, e);
       throw new RuntimeException(e);
     }
+    return result;
   }
 
 
@@ -291,7 +294,7 @@ public class ReduceSinkOperator extends
 
   @Override
   @SuppressWarnings("unchecked")
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     try {
       ObjectInspector rowInspector = inputObjInspectors[tag];
       if (firstRow) {
@@ -514,6 +517,7 @@ public class ReduceSinkOperator extends
     return keyWritable;
   }
 
+  @Override
   public void collect(byte[] key, byte[] value, int hash) throws IOException {
     HiveKey keyWritable = new HiveKey(key, hash);
     BytesWritable valueWritable = new BytesWritable(value);
@@ -608,6 +612,7 @@ public class ReduceSinkOperator extends
     return inputAliases;
   }
 
+  @Override
   public void setOutputCollector(OutputCollector _out) {
     this.out = _out;
   }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Wed Mar 18 05:40:07 2015
@@ -21,10 +21,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -91,7 +93,7 @@ public class SMBMapJoinOperator extends
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
 
     // If there is a sort-merge join followed by a regular join, the SMBJoinOperator may not
     // get initialized at all. Consider the following query:
@@ -99,7 +101,7 @@ public class SMBMapJoinOperator extends
     // For the mapper processing C, The SMJ is not initialized, no need to close it either.
     initDone = true;
 
-    super.initializeOp(hconf);
+    Collection<Future<?>> result = super.initializeOp(hconf);
 
     closeCalled = false;
 
@@ -154,6 +156,7 @@ public class SMBMapJoinOperator extends
       }
       foundNextKeyGroup[pos] = false;
     }
+    return result;
   }
 
   @Override
@@ -195,7 +198,7 @@ public class SMBMapJoinOperator extends
       HiveInputFormat.pushFilters(jobClone, ts);
 
 
-      ts.setExecContext(getExecContext());
+      ts.passExecContext(getExecContext());
 
       FetchOperator fetchOp = new FetchOperator(fetchWork, jobClone);
       ts.initialize(jobClone, new ObjectInspector[]{fetchOp.getOutputObjectInspector()});
@@ -231,7 +234,7 @@ public class SMBMapJoinOperator extends
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
 
     if (tag == posBigTable) {
       if (inputFileChanged) {
@@ -555,7 +558,7 @@ public class SMBMapJoinOperator extends
         fetchDone[tag] = true;
         return;
       }
-      forwardOp.processOp(row.o, tag);
+      forwardOp.process(row.o, tag);
       // check if any operator had a fatal error or early exit during
       // execution
       if (forwardOp.getDone()) {
@@ -803,7 +806,7 @@ public class SMBMapJoinOperator extends
 
         // Pass the row though the operator tree. It is guaranteed that not more than 1 row can
         // be produced from a input row.
-        forwardOp.processOp(nextRow.o, 0);
+        forwardOp.process(nextRow.o, 0);
         nextRow = sinkOp.getResult();
 
         // It is possible that the row got absorbed in the operator tree.

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Wed Mar 18 05:40:07 2015
@@ -27,12 +27,14 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -152,7 +154,6 @@ public class ScriptOperator extends Oper
         if (bl != null && bl.length() > 0) {
           String[] bls = bl.split(",");
           for (String b : bls) {
-            b.replaceAll(".", "_");
             blackListedConfEntries.add(b);
           }
         }
@@ -260,7 +261,8 @@ public class ScriptOperator extends Oper
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     firstRow = true;
 
     statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
@@ -281,11 +283,10 @@ public class ScriptOperator extends Oper
 
       outputObjInspector = scriptOutputDeserializer.getObjectInspector();
 
-      // initialize all children before starting the script
-      initializeChildren(hconf);
     } catch (Exception e) {
       throw new HiveException(ErrorMsg.SCRIPT_INIT_ERROR.getErrorCodedMsg(), e);
     }
+    return result;
   }
 
   boolean isBrokenPipeException(IOException e) {
@@ -322,7 +323,7 @@ public class ScriptOperator extends Oper
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     // initialize the user's process only when you receive the first row
     if (firstRow) {
       firstRow = false;
@@ -574,6 +575,7 @@ public class ScriptOperator extends Oper
       this.rowInspector = rowInspector;
     }
 
+    @Override
     public void processLine(Writable line) throws HiveException {
       try {
         row = scriptOutputDeserializer.deserialize(line);
@@ -584,6 +586,7 @@ public class ScriptOperator extends Oper
       forward(row, rowInspector);
     }
 
+    @Override
     public void close() {
     }
   }
@@ -652,6 +655,7 @@ public class ScriptOperator extends Oper
       }
     }
 
+    @Override
     public void processLine(Writable line) throws HiveException {
 
       String stringLine = line.toString();
@@ -694,6 +698,7 @@ public class ScriptOperator extends Oper
       bytesCopied += len;
     }
 
+    @Override
     public void close() {
     }
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Wed Mar 18 05:40:07 2015
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,12 +44,12 @@ public class SelectOperator extends Oper
   private transient boolean isSelectStarNoCompute = false;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     // Just forward the row as is
     if (conf.isSelStarNoCompute()) {
-      initializeChildren(hconf);
       isSelectStarNoCompute = true;
-      return;
+      return result;
     }
     List<ExprNodeDesc> colList = conf.getColList();
     eval = new ExprNodeEvaluator[colList.size()];
@@ -64,11 +66,11 @@ public class SelectOperator extends Oper
     }
     outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf.getOutputColumnNames(),
         inputObjInspectors[0]);
-    initializeChildren(hconf);
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (isSelectStarNoCompute) {
       forward(row, inputObjInspectors[tag]);
       return;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Wed Mar 18 05:40:07 2015
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.Future;
 
 import org.apache.commons.io.FileExistsException;
 import org.apache.commons.logging.Log;
@@ -50,7 +52,7 @@ public class SparkHashTableSinkOperator
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   protected static final Log LOG = LogFactory.getLog(SparkHashTableSinkOperator.class.getName());
 
-  private HashTableSinkOperator htsOperator;
+  private final HashTableSinkOperator htsOperator;
 
   // The position of this table
   private byte tag;
@@ -60,18 +62,20 @@ public class SparkHashTableSinkOperator
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()];
     inputOIs[tag] = inputObjInspectors[0];
     conf.setTagOrder(new Byte[]{ tag });
     htsOperator.setConf(conf);
     htsOperator.initialize(hconf, inputOIs);
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     // Ignore the tag passed in, which should be 0, not what we want
-    htsOperator.processOp(row, this.tag);
+    htsOperator.process(row, this.tag);
   }
 
   @Override

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Wed Mar 18 05:40:07 2015
@@ -20,9 +20,11 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -84,7 +86,7 @@ public class TableScanOperator extends O
    * operator will be enhanced to read the table.
    **/
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (rowLimit >= 0 && currCount++ >= rowLimit) {
       setDone(true);
       return;
@@ -153,9 +155,9 @@ public class TableScanOperator extends O
           values.add(o == null ? defaultPartitionName : o.toString());
         }
         partitionSpecs = FileUtils.makePartName(conf.getPartColumns(), values);
-	if (isLogInfoEnabled) {
-	  LOG.info("Stats Gathering found a new partition spec = " + partitionSpecs);
-	}
+        if (isLogInfoEnabled) {
+          LOG.info("Stats Gathering found a new partition spec = " + partitionSpecs);
+        }
       }
       // find which column contains the raw data size (both partitioned and non partitioned
       int uSizeColumn = -1;
@@ -191,16 +193,17 @@ public class TableScanOperator extends O
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    initializeChildren(hconf);
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     inputFileChanged = false;
 
     if (conf == null) {
-      return;
+      return result;
     }
+
     rowLimit = conf.getRowLimit();
     if (!conf.isGatherStats()) {
-      return;
+      return result;
     }
 
     this.hconf = hconf;
@@ -216,9 +219,9 @@ public class TableScanOperator extends O
     stats = new HashMap<String, Stat>();
     if (conf.getPartColumns() == null || conf.getPartColumns().size() == 0) {
       // NON PARTITIONED table
-      return;
+      return result;
     }
-
+    return result;
   }
 
   @Override
@@ -282,7 +285,7 @@ public class TableScanOperator extends O
     if (!statsPublisher.connect(jc)) {
       // just return, stats gathering should not block the main query.
       if (isLogInfoEnabled) {
-	LOG.info("StatsPublishing error: cannot connect to database.");
+        LOG.info("StatsPublishing error: cannot connect to database.");
       }
       if (isStatsReliable) {
         throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg());

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java Wed Mar 18 05:40:07 2015
@@ -33,8 +33,8 @@ public class TezDummyStoreOperator exten
    * the records.
    */
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
-    super.processOp(row, tag);
+  public void process(Object row, int tag) throws HiveException {
+    super.process(row, tag);
     forward(result.o, outputObjInspector);
   }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Wed Mar 18 05:40:07 2015
@@ -20,7 +20,9 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -57,7 +59,8 @@ public class UDTFOperator extends Operat
   transient AutoProgressor autoProgressor;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     genericUDTF = conf.getGenericUDTF();
     collector = new UDTFCollector(this);
 
@@ -90,13 +93,11 @@ public class UDTFOperator extends Operat
               hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS));
       autoProgressor.go();
     }
-
-    // Initialize the rest of the operator DAG
-    super.initializeOp(hconf);
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     // The UDTF expects arguments in an object[]
     StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
     List<? extends StructField> fields = soi.getAllStructFieldRefs();

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Wed Mar 18 05:40:07 2015
@@ -20,7 +20,9 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -54,7 +56,8 @@ public class UnionOperator extends Opera
    * needsTransform[].
    */
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
 
     int parents = parentOperators.size();
     parentObjInspectors = new StructObjectInspector[parents];
@@ -116,11 +119,11 @@ public class UnionOperator extends Opera
             + "] from " + inputObjInspectors[p] + " to " + outputObjInspector);
       }
     }
-    initializeChildren(hconf);
+    return result;
   }
 
   @Override
-  public synchronized void processOp(Object row, int tag) throws HiveException {
+  public synchronized void process(Object row, int tag) throws HiveException {
 
     StructObjectInspector soi = parentObjInspectors[tag];
     List<? extends StructField> fields = parentFields[tag];

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Wed Mar 18 05:40:07 2015
@@ -24,9 +24,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -105,6 +105,7 @@ public class ExecMapper extends MapReduc
       }
       mo.setConf(mrwork);
       // initialize map operator
+      mo.initialize(job, null);
       mo.setChildren(job);
       l4j.info(mo.dump(0));
       // initialize map local work
@@ -113,9 +114,9 @@ public class ExecMapper extends MapReduc
 
       MapredContext.init(true, new JobConf(jc));
 
-      mo.setExecContext(execContext);
+      mo.passExecContext(execContext);
       mo.initializeLocalWork(jc);
-      mo.initialize(jc, null);
+      mo.initializeMapOperator(jc);
 
       if (localWork == null) {
         return;
@@ -126,7 +127,7 @@ public class ExecMapper extends MapReduc
       l4j.info("Initializing dummy operator");
       List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
       for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
-        dummyOp.setExecContext(execContext);
+        dummyOp.passExecContext(execContext);
         dummyOp.initialize(jc,null);
       }
     } catch (Throwable e) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Wed Mar 18 05:40:07 2015
@@ -232,7 +232,7 @@ public class ExecReducer extends MapRedu
         row.add(valueObject[tag]);
 
         try {
-          reducer.processOp(row, tag);
+          reducer.process(row, tag);
         } catch (Exception e) {
           String rowString = null;
           try {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java Wed Mar 18 05:40:07 2015
@@ -32,11 +32,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -62,7 +61,8 @@ public class HashTableLoader implements
   private MapJoinDesc desc;
 
   @Override
-  public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
+  public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
+      MapJoinOperator joinOp) {
     this.context = context;
     this.hconf = hconf;
     this.joinOp = joinOp;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Wed Mar 18 05:40:07 2015
@@ -404,7 +404,7 @@ public class MapredLocalTask extends Tas
         if (row == null) {
           break;
         }
-        forwardOp.processOp(row.o, 0);
+        forwardOp.process(row.o, 0);
       }
       forwardOp.flush();
     }
@@ -445,7 +445,7 @@ public class MapredLocalTask extends Tas
       Operator<? extends OperatorDesc> forwardOp = work.getAliasToWork().get(alias);
 
       // put the exe context into all the operators
-      forwardOp.setExecContext(execContext);
+      forwardOp.passExecContext(execContext);
       // All the operators need to be initialized before process
       FetchOperator fetchOp = entry.getValue();
       JobConf jobConf = fetchOpJobConfMap.get(fetchOp);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java Wed Mar 18 05:40:07 2015
@@ -19,6 +19,10 @@
 package org.apache.hadoop.hive.ql.exec.mr;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,18 +36,59 @@ import org.apache.hadoop.hive.ql.metadat
 public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
 
   private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
+  private static final boolean isInfoEnabled = LOG.isInfoEnabled();
 
   @Override
   public void release(String key) {
     // nothing to do
+    if (isInfoEnabled) {
+      LOG.info(key + " no longer needed");
+    }
   }
 
   @Override
-  public Object retrieve(String key, Callable<?> fn) throws HiveException {
+  public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
     try {
+      if (isInfoEnabled) {
+        LOG.info("Creating " + key);
+      }
       return fn.call();
     } catch (Exception e) {
       throw new HiveException(e);
     }
   }
+
+  @Override
+  public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws HiveException {
+    final T value = retrieve(key, fn);
+
+    return new Future<T>() {
+
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return false;
+      }
+
+      @Override
+      public boolean isDone() {
+        return true;
+      }
+
+      @Override
+      public T get() throws InterruptedException, ExecutionException {
+        return value;
+      }
+
+      @Override
+      public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+          TimeoutException {
+        return value;
+      }
+    };
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java Wed Mar 18 05:40:07 2015
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -58,7 +59,8 @@ public class HashTableLoader implements
   private MapJoinDesc desc;
 
   @Override
-  public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
+  public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
+      MapJoinOperator joinOp) {
     this.context = context;
     this.hconf = hconf;
     this.joinOp = joinOp;
@@ -66,9 +68,9 @@ public class HashTableLoader implements
   }
 
   @Override
-  public void load(
-      MapJoinTableContainer[] mapJoinTables,
-      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
+  public void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage)
+      throws HiveException {
 
     // Note: it's possible that a MJ operator is in a ReduceWork, in which case the
     // currentInputPath will be null. But, since currentInputPath is only interesting

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Wed Mar 18 05:40:07 2015
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -29,7 +33,6 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -38,10 +41,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
 
 /**
  * Clone from ExecMapper. SparkMapRecordHandler is the bridge between the spark framework and
@@ -61,6 +60,7 @@ public class SparkMapRecordHandler exten
   private boolean isLogInfoEnabled = false;
   private ExecMapperContext execContext;
 
+  @Override
   public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
     super.init(job, output, reporter);
@@ -81,6 +81,7 @@ public class SparkMapRecordHandler exten
       mo.setConf(mrwork);
 
       // initialize map operator
+      mo.initialize(jc, null);
       mo.setChildren(job);
       LOG.info(mo.dump(0));
       // initialize map local work
@@ -90,9 +91,9 @@ public class SparkMapRecordHandler exten
       MapredContext.init(true, new JobConf(jc));
       MapredContext.get().setReporter(reporter);
 
-      mo.setExecContext(execContext);
+      mo.passExecContext(execContext);
       mo.initializeLocalWork(jc);
-      mo.initialize(jc, null);
+      mo.initializeMapOperator(jc);
 
       OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
       mo.setReporter(rp);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java Wed Mar 18 05:40:07 2015
@@ -93,7 +93,7 @@ public class SparkMergeFileRecordHandler
     row[0] = key;
     row[1] = value;
     try {
-      mergeOp.processOp(row, 0);
+      mergeOp.process(row, 0);
     } catch (HiveException e) {
       abort = true;
       throw new IOException(e);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Wed Mar 18 05:40:07 2015
@@ -103,6 +103,7 @@ public class SparkReduceRecordHandler ex
   private List<VectorExpressionWriter>[] valueStringWriters;
   private MapredLocalWork localWork = null;
 
+  @Override
   @SuppressWarnings("unchecked")
   public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
@@ -132,7 +133,7 @@ public class SparkReduceRecordHandler ex
         keyStructInspector = (StructObjectInspector) keyObjectInspector;
         batches = new VectorizedRowBatch[maxTags];
         valueStructInspectors = new StructObjectInspector[maxTags];
-        valueStringWriters = (List<VectorExpressionWriter>[]) new List[maxTags];
+        valueStringWriters = new List[maxTags];
         keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
         buffer = new DataOutputBuffer();
       }
@@ -196,7 +197,7 @@ public class SparkReduceRecordHandler ex
     localWork = gWork.getMapRedLocalWork();
     execContext.setJc(jc);
     execContext.setLocalWork(localWork);
-    reducer.setExecContext(execContext);
+    reducer.passExecContext(execContext);
 
     reducer.setReporter(rp);
     OperatorUtils.setChildrenCollector(
@@ -318,7 +319,7 @@ public class SparkReduceRecordHandler ex
         logMemoryInfo();
       }
       try {
-        reducer.processOp(row, tag);
+        reducer.process(row, tag);
       } catch (Exception e) {
         String rowString = null;
         try {
@@ -360,7 +361,7 @@ public class SparkReduceRecordHandler ex
         rowIdx++;
         if (rowIdx >= BATCH_SIZE) {
           VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-          reducer.processOp(batch, tag);
+          reducer.process(batch, tag);
           rowIdx = 0;
           if (isLogInfoEnabled) {
             logMemoryInfo();
@@ -369,7 +370,7 @@ public class SparkReduceRecordHandler ex
       }
       if (rowIdx > 0) {
         VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-        reducer.processOp(batch, tag);
+        reducer.process(batch, tag);
       }
       if (isLogInfoEnabled) {
         logMemoryInfo();
@@ -401,6 +402,7 @@ public class SparkReduceRecordHandler ex
     }
   }
 
+  @Override
   public void close() {
 
     // No row was processed

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Wed Mar 18 05:40:07 2015
@@ -19,12 +19,10 @@ package org.apache.hadoop.hive.ql.exec.s
 
 import java.io.File;
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.URL;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -96,8 +94,8 @@ public class SparkUtilities {
       return null;
     }
 
-    String[] splits = uri.getPath().split(File.separator);
-    return  splits[splits.length-1];
+    String name = FilenameUtils.getName(uri.getPath());
+    return name;
   }
 
   public static SparkSession getSparkSession(HiveConf conf,

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Wed Mar 18 05:40:07 2015
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec.s
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
@@ -77,7 +76,7 @@ abstract class SparkJobMonitor {
       final int failed = progress.getFailedTaskCount();
       String stageName = "Stage-" + s;
       if (total <= 0) {
-        reportBuffer.append(String.format("%s: -/-\t", stageName, complete, total));
+        reportBuffer.append(String.format("%s: -/-\t", stageName));
       } else {
         if (complete == total && !completed.contains(s)) {
           completed.add(s);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Wed Mar 18 05:40:07 2015
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -26,11 +27,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
@@ -43,6 +42,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.Writable;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
@@ -54,30 +54,29 @@ public class HashTableLoader implements
 
   private static final Log LOG = LogFactory.getLog(HashTableLoader.class.getName());
 
-  private ExecMapperContext context;
   private Configuration hconf;
   private MapJoinDesc desc;
+  private TezContext tezContext;
 
   @Override
-  public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
-    this.context = context;
+  public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
+      MapJoinOperator joinOp) {
+    this.tezContext = (TezContext) mrContext;
     this.hconf = hconf;
     this.desc = joinOp.getConf();
   }
 
   @Override
-  public void load(
-      MapJoinTableContainer[] mapJoinTables,
-      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
+  public void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage)
+      throws HiveException {
 
-    TezContext tezContext = (TezContext) MapredContext.get();
     Map<Integer, String> parentToInput = desc.getParentToInput();
     Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
 
     boolean useOptimizedTables = HiveConf.getBoolVar(
         hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE);
     boolean isFirstKey = true;
-    TezCacheAccess tezCacheAccess = TezCacheAccess.createInstance(hconf);
     for (int pos = 0; pos < mapJoinTables.length; pos++) {
       if (pos == desc.getPosBigTable()) {
         continue;
@@ -87,6 +86,14 @@ public class HashTableLoader implements
       LogicalInput input = tezContext.getInput(inputName);
 
       try {
+        input.start();
+        tezContext.getTezProcessorContext().waitForAnyInputReady(
+            Collections.<Input> singletonList(input));
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+
+      try {
         KeyValueReader kvReader = (KeyValueReader) input.getReader();
         MapJoinObjectSerDeContext keyCtx = mapJoinTableSerdes[pos].getKeyContext(),
           valCtx = mapJoinTableSerdes[pos].getValueContext();
@@ -122,14 +129,6 @@ public class HashTableLoader implements
       } catch (Exception e) {
         throw new HiveException(e);
       }
-      // Register that the Input has been cached.
-      LOG.info("Is this a bucket map join: " + desc.isBucketMapJoin());
-      // cache is disabled for bucket map join because of the same reason
-      // given in loadHashTable in MapJoinOperator.
-      if (!desc.isBucketMapJoin()) {
-        tezCacheAccess.registerCachedInput(inputName);
-        LOG.info("Setting Input: " + inputName + " as cached");
-      }
     }
   }
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java Wed Mar 18 05:40:07 2015
@@ -21,6 +21,10 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
@@ -46,6 +50,8 @@ public class LlapObjectCache implements
 
   private static final ReentrantLock lock = new ReentrantLock();
 
+  private static ExecutorService staticPool = Executors.newCachedThreadPool();
+
   private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
   private static final boolean isLogInfoEnabled = LOG.isInfoEnabled();
 
@@ -58,20 +64,19 @@ public class LlapObjectCache implements
   }
 
   @Override
-  public Object retrieve(String key, Callable<?> fn)
-    throws HiveException {
+  public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
 
-    Object o = null;
+    T value = null;
     ReentrantLock objectLock = null;
 
     lock.lock();
     try {
-      o = registry.getIfPresent(key);
-      if (o != null) {
+      value = (T) registry.getIfPresent(key);
+      if (value != null) {
         if (isLogInfoEnabled) {
           LOG.info("Found " + key + " in cache");
         }
-        return o;
+        return value;
       }
 
       if (locks.containsKey(key)) {
@@ -88,19 +93,19 @@ public class LlapObjectCache implements
     try{
       lock.lock();
       try {
-        o = registry.getIfPresent(key);
-        if (o != null) {
+        value = (T) registry.getIfPresent(key);
+        if (value != null) {
           if (isLogInfoEnabled) {
             LOG.info("Found " + key + " in cache");
           }
-          return o;
+          return value;
         }
       } finally {
         lock.unlock();
       }
 
       try {
-        o = fn.call();
+        value = fn.call();
       } catch (Exception e) {
         throw new HiveException(e);
       }
@@ -111,7 +116,7 @@ public class LlapObjectCache implements
           LOG.info("Caching new object for key: " + key);
         }
 
-        registry.put(key, o);
+        registry.put(key, value);
         locks.remove(key);
       } finally {
         lock.unlock();
@@ -119,6 +124,16 @@ public class LlapObjectCache implements
     } finally {
       objectLock.unlock();
     }
-    return o;
+    return value;
+  }
+
+  @Override
+  public <T> Future<T> retrieveAsync(final String key, final Callable<T> fn) throws HiveException {
+    return staticPool.submit(new Callable<T>() {
+      @Override
+      public T call() throws Exception {
+        return retrieve(key, fn);
+      }
+    });
   }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Mar 18 05:40:07 2015
@@ -22,9 +22,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.Callable;
 
 import org.apache.commons.logging.Log;
@@ -55,6 +58,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -106,6 +110,7 @@ public class MapRecordProcessor extends
 
     // create map and fetch operators
     mapWork = (MapWork) cache.retrieve(key, new Callable<Object>() {
+        @Override
         public Object call() {
           return Utilities.getMapWork(jconf);
         }
@@ -127,6 +132,7 @@ public class MapRecordProcessor extends
 	mergeWorkList.add(
           (MapWork) cache.retrieve(key,
               new Callable<Object>() {
+                @Override
                 public Object call() {
                   return Utilities.getMergeWork(jconf, prefix);
                 }
@@ -134,6 +140,10 @@ public class MapRecordProcessor extends
       }
     }
 
+    MapredContext.init(true, new JobConf(jconf));
+    ((TezContext) MapredContext.get()).setInputs(inputs);
+    ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
+
     // Update JobConf using MRInput, info like filename comes via this
     legacyMRInput = getMRInput(inputs);
     if (legacyMRInput != null) {
@@ -162,6 +172,8 @@ public class MapRecordProcessor extends
       }
 
       mapOp.clearConnectedOperators();
+      mapOp.setExecContext(execContext);
+
       if (mergeWorkList != null) {
         MapOperator mergeMapOp = null;
         for (MapWork mergeMapWork : mergeWorkList) {
@@ -177,12 +189,13 @@ public class MapRecordProcessor extends
             mergeMapOp.setConf(mergeMapWork);
             l4j.info("Input name is " + mergeMapWork.getName());
             jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName());
+            mergeMapOp.initialize(jconf, null);
             mergeMapOp.setChildren(jconf);
 
             DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
 	    mapOp.setConnectedOperators(mergeMapWork.getTag(), dummyOp);
 
-            mergeMapOp.setExecContext(new ExecMapperContext(jconf));
+            mergeMapOp.passExecContext(new ExecMapperContext(jconf));
             mergeMapOp.initializeLocalWork(jconf);
           }
         }
@@ -192,21 +205,19 @@ public class MapRecordProcessor extends
       mapOp.setConf(mapWork);
       l4j.info("Main input name is " + mapWork.getName());
       jconf.set(Utilities.INPUT_NAME, mapWork.getName());
+      mapOp.initialize(jconf, null);
       mapOp.setChildren(jconf);
+      mapOp.passExecContext(execContext);
       l4j.info(mapOp.dump(0));
 
-      MapredContext.init(true, new JobConf(jconf));
-      ((TezContext) MapredContext.get()).setInputs(inputs);
-      ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
-      mapOp.setExecContext(execContext);
       mapOp.initializeLocalWork(jconf);
 
       initializeMapRecordSources();
-      mapOp.initialize(jconf, null);
+      mapOp.initializeMapOperator(jconf);
       if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) {
         for (MapOperator mergeMapOp : mergeMapOpList) {
           jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName());
-          mergeMapOp.initialize(jconf, null);
+          mergeMapOp.initializeMapOperator(jconf);
         }
       }
 
@@ -350,6 +361,17 @@ public class MapRecordProcessor extends
   private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
     // there should be only one MRInput
     MRInputLegacy theMRInput = null;
+
+    // start all mr/multi-mr inputs
+    Set<Input> li = new HashSet<Input>();
+    for (LogicalInput inp: inputs.values()) {
+      if (inp instanceof MRInputLegacy || inp instanceof MultiMRInput) {
+        inp.start();
+        li.add(inp);
+      }
+    }
+    processorContext.waitForAllInputsReady(li);
+
     l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray()));
     for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
       if (inp.getValue() instanceof MRInputLegacy) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Wed Mar 18 05:40:07 2015
@@ -98,10 +98,11 @@ public class MergeFileRecordProcessor ex
       cacheKey = queryId + MAP_PLAN_KEY;
 
       MapWork mapWork = (MapWork) cache.retrieve(cacheKey, new Callable<Object>() {
-	  public Object call() {
-	    return Utilities.getMapWork(jconf);
-	  }
-	});
+        @Override
+        public Object call() {
+          return Utilities.getMapWork(jconf);
+        }
+      });
       Utilities.setMapWork(jconf, mapWork);
 
       if (mapWork instanceof MergeFileWork) {
@@ -116,7 +117,7 @@ public class MergeFileRecordProcessor ex
 
       MapredContext.init(true, new JobConf(jconf));
       ((TezContext) MapredContext.get()).setInputs(inputs);
-      mergeOp.setExecContext(execContext);
+      mergeOp.passExecContext(execContext);
       mergeOp.initializeLocalWork(jconf);
       mergeOp.initialize(jconf, null);
 
@@ -198,7 +199,7 @@ public class MergeFileRecordProcessor ex
       } else {
         row[0] = key;
         row[1] = value;
-        mergeOp.processOp(row, 0);
+        mergeOp.process(row, 0);
       }
     } catch (Throwable e) {
       abort = true;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Wed Mar 18 05:40:07 2015
@@ -19,11 +19,14 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.tez.runtime.api.ObjectRegistry;
 
 import com.google.common.base.Preconditions;
 
@@ -41,6 +44,8 @@ public class ObjectCache implements org.
   // before anything else.
   private volatile static ObjectRegistry staticRegistry;
 
+  private static ExecutorService staticPool;
+
   private final ObjectRegistry registry;
 
   public ObjectCache() {
@@ -51,6 +56,7 @@ public class ObjectCache implements org.
 
   public static void setupObjectRegistry(ObjectRegistry objectRegistry) {
     staticRegistry = objectRegistry;
+    staticPool = Executors.newCachedThreadPool();
   }
 
   @Override
@@ -59,21 +65,32 @@ public class ObjectCache implements org.
     LOG.info("Releasing key: " + key);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public Object retrieve(String key, Callable<?> fn) throws HiveException {
-    Object o;
+  public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
+    T value;
     try {
-      o = registry.get(key);
-      if (o == null) {
-	o = fn.call();
-	LOG.info("Caching key: " + key);
-	registry.cacheForVertex(key, o);
+      value = (T) registry.get(key);
+      if (value == null) {
+        value = fn.call();
+        LOG.info("Caching key: " + key);
+        registry.cacheForVertex(key, value);
       } else {
-	LOG.info("Found " + key + " in cache with value: " + o);
+        LOG.info("Found " + key + " in cache with value: " + value);
       }
     } catch (Exception e) {
       throw new HiveException(e);
     }
-    return o;
+    return value;
+  }
+
+  @Override
+  public <T> Future<T> retrieveAsync(final String key, final Callable<T> fn) throws HiveException {
+    return staticPool.submit(new Callable<T>() {
+      @Override
+      public T call() throws Exception {
+        return retrieve(key, fn);
+      }
+    });
   }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1667456&r1=1667455&r2=1667456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Mar 18 05:40:07 2015
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -92,6 +93,7 @@ public class ReduceRecordProcessor  exte
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
     cacheKey = queryId + processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
     redWork = (ReduceWork) cache.retrieve(cacheKey, new Callable<Object>() {
+        @Override
         public Object call() {
           return Utilities.getReduceWork(jconf);
         }
@@ -110,9 +112,14 @@ public class ReduceRecordProcessor  exte
     for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
       TableDesc keyTableDesc = redWork.getKeyDesc();
       TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag);
-      KeyValuesReader reader =
-          (KeyValuesReader) inputs.get(redWork.getTagToInput().get(tag)).getReader();
 
+      // make the reader ready for prime time
+      Input input = inputs.get(redWork.getTagToInput().get(tag));
+      input.start();
+      processorContext.waitForAnyInputReady(Collections.singleton(input));
+      KeyValuesReader reader = (KeyValuesReader) input.getReader();
+
+      // now we can setup the record source
       sources[tag] = new ReduceRecordSource();
       sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc,
           reader, tag == position, (byte) tag,



Mime
View raw message