hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1667454 [2/3] - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mr/ java/org/apache/hadoop/hive/ql/exec/spark/ java/org/apache/hadoop/hive/ql/exec/tez/ java/org/apache/hadoop/hive/ql/exec/vec...
Date Wed, 18 Mar 2015 05:03:25 GMT
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Wed Mar 18 05:03:23 2015
@@ -19,12 +19,13 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Stack;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -66,7 +67,8 @@ public class PTFOperator extends Operato
    * 4. Create input partition to store rows coming from previous operator
    */
   @Override
-  protected void initializeOp(Configuration jobConf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration jobConf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(jobConf);
     hiveConf = jobConf;
     isMapOperator = conf.isMapSide();
 
@@ -84,8 +86,7 @@ public class PTFOperator extends Operato
     ptfInvocation = setupChain();
     ptfInvocation.initializeStreaming(jobConf, isMapOperator);
     firstMapRow = true;
-
-    super.initializeOp(jobConf);
+    return result;
   }
 
   @Override
@@ -96,7 +97,7 @@ public class PTFOperator extends Operato
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (!isMapOperator ) {
       /*
        * checkif current row belongs to the current accumulated Partition:

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java Wed Mar 18 05:03:23 2015
@@ -43,7 +43,7 @@ public class RCFileMergeOperator
   int columnNumber = 0;
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     Object[] keyValue = (Object[]) row;
     processKeyValuePairs(keyValue[0], keyValue[1]);
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Wed Mar 18 05:03:23 2015
@@ -18,12 +18,16 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -48,14 +52,12 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.hash.MurmurHash;
 
-import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
-
 /**
  * Reduce Sink Operator sends output to the reduce stage.
  **/
@@ -153,7 +155,8 @@ public class ReduceSinkOperator extends
   private final transient LongWritable recordCounter = new LongWritable();
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     try {
 
       numRows = 0;
@@ -237,12 +240,12 @@ public class ReduceSinkOperator extends
       useUniformHash = conf.getReducerTraits().contains(UNIFORM);
 
       firstRow = true;
-      initializeChildren(hconf);
     } catch (Exception e) {
       String msg = "Error initializing ReduceSinkOperator: " + e.getMessage();
       LOG.error(msg, e);
       throw new RuntimeException(e);
     }
+    return result;
   }
 
 
@@ -291,7 +294,7 @@ public class ReduceSinkOperator extends
 
   @Override
   @SuppressWarnings("unchecked")
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     try {
       ObjectInspector rowInspector = inputObjInspectors[tag];
       if (firstRow) {
@@ -514,6 +517,7 @@ public class ReduceSinkOperator extends
     return keyWritable;
   }
 
+  @Override
   public void collect(byte[] key, byte[] value, int hash) throws IOException {
     HiveKey keyWritable = new HiveKey(key, hash);
     BytesWritable valueWritable = new BytesWritable(value);
@@ -608,6 +612,7 @@ public class ReduceSinkOperator extends
     return inputAliases;
   }
 
+  @Override
   public void setOutputCollector(OutputCollector _out) {
     this.out = _out;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Wed Mar 18 05:03:23 2015
@@ -21,10 +21,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -91,7 +93,7 @@ public class SMBMapJoinOperator extends
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
 
     // If there is a sort-merge join followed by a regular join, the SMBJoinOperator may not
     // get initialized at all. Consider the following query:
@@ -99,7 +101,7 @@ public class SMBMapJoinOperator extends
     // For the mapper processing C, The SMJ is not initialized, no need to close it either.
     initDone = true;
 
-    super.initializeOp(hconf);
+    Collection<Future<?>> result = super.initializeOp(hconf);
 
     closeCalled = false;
 
@@ -154,6 +156,7 @@ public class SMBMapJoinOperator extends
       }
       foundNextKeyGroup[pos] = false;
     }
+    return result;
   }
 
   @Override
@@ -195,7 +198,7 @@ public class SMBMapJoinOperator extends
       HiveInputFormat.pushFilters(jobClone, ts);
 
 
-      ts.setExecContext(getExecContext());
+      ts.passExecContext(getExecContext());
 
       FetchOperator fetchOp = new FetchOperator(fetchWork, jobClone);
       ts.initialize(jobClone, new ObjectInspector[]{fetchOp.getOutputObjectInspector()});
@@ -231,7 +234,7 @@ public class SMBMapJoinOperator extends
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
 
     if (tag == posBigTable) {
       if (inputFileChanged) {
@@ -555,7 +558,7 @@ public class SMBMapJoinOperator extends
         fetchDone[tag] = true;
         return;
       }
-      forwardOp.processOp(row.o, tag);
+      forwardOp.process(row.o, tag);
       // check if any operator had a fatal error or early exit during
       // execution
       if (forwardOp.getDone()) {
@@ -803,7 +806,7 @@ public class SMBMapJoinOperator extends
 
         // Pass the row though the operator tree. It is guaranteed that not more than 1 row can
         // be produced from a input row.
-        forwardOp.processOp(nextRow.o, 0);
+        forwardOp.process(nextRow.o, 0);
         nextRow = sinkOp.getResult();
 
         // It is possible that the row got absorbed in the operator tree.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Wed Mar 18 05:03:23 2015
@@ -27,12 +27,14 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -259,7 +261,8 @@ public class ScriptOperator extends Oper
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     firstRow = true;
 
     statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
@@ -280,11 +283,10 @@ public class ScriptOperator extends Oper
 
       outputObjInspector = scriptOutputDeserializer.getObjectInspector();
 
-      // initialize all children before starting the script
-      initializeChildren(hconf);
     } catch (Exception e) {
       throw new HiveException(ErrorMsg.SCRIPT_INIT_ERROR.getErrorCodedMsg(), e);
     }
+    return result;
   }
 
   boolean isBrokenPipeException(IOException e) {
@@ -321,7 +323,7 @@ public class ScriptOperator extends Oper
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     // initialize the user's process only when you receive the first row
     if (firstRow) {
       firstRow = false;
@@ -573,6 +575,7 @@ public class ScriptOperator extends Oper
       this.rowInspector = rowInspector;
     }
 
+    @Override
     public void processLine(Writable line) throws HiveException {
       try {
         row = scriptOutputDeserializer.deserialize(line);
@@ -583,6 +586,7 @@ public class ScriptOperator extends Oper
       forward(row, rowInspector);
     }
 
+    @Override
     public void close() {
     }
   }
@@ -651,6 +655,7 @@ public class ScriptOperator extends Oper
       }
     }
 
+    @Override
     public void processLine(Writable line) throws HiveException {
 
       String stringLine = line.toString();
@@ -693,6 +698,7 @@ public class ScriptOperator extends Oper
       bytesCopied += len;
     }
 
+    @Override
     public void close() {
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Wed Mar 18 05:03:23 2015
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,12 +44,12 @@ public class SelectOperator extends Oper
   private transient boolean isSelectStarNoCompute = false;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     // Just forward the row as is
     if (conf.isSelStarNoCompute()) {
-      initializeChildren(hconf);
       isSelectStarNoCompute = true;
-      return;
+      return result;
     }
     List<ExprNodeDesc> colList = conf.getColList();
     eval = new ExprNodeEvaluator[colList.size()];
@@ -64,11 +66,11 @@ public class SelectOperator extends Oper
     }
     outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf.getOutputColumnNames(),
         inputObjInspectors[0]);
-    initializeChildren(hconf);
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (isSelectStarNoCompute) {
       forward(row, inputObjInspectors[tag]);
       return;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Wed Mar 18 05:03:23 2015
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.Future;
 
 import org.apache.commons.io.FileExistsException;
 import org.apache.commons.logging.Log;
@@ -50,7 +52,7 @@ public class SparkHashTableSinkOperator
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   protected static final Log LOG = LogFactory.getLog(SparkHashTableSinkOperator.class.getName());
 
-  private HashTableSinkOperator htsOperator;
+  private final HashTableSinkOperator htsOperator;
 
   // The position of this table
   private byte tag;
@@ -60,18 +62,20 @@ public class SparkHashTableSinkOperator
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()];
     inputOIs[tag] = inputObjInspectors[0];
     conf.setTagOrder(new Byte[]{ tag });
     htsOperator.setConf(conf);
     htsOperator.initialize(hconf, inputOIs);
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     // Ignore the tag passed in, which should be 0, not what we want
-    htsOperator.processOp(row, this.tag);
+    htsOperator.process(row, this.tag);
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Wed Mar 18 05:03:23 2015
@@ -20,9 +20,11 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -84,7 +86,7 @@ public class TableScanOperator extends O
    * operator will be enhanced to read the table.
    **/
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (rowLimit >= 0 && currCount++ >= rowLimit) {
       setDone(true);
       return;
@@ -153,9 +155,9 @@ public class TableScanOperator extends O
           values.add(o == null ? defaultPartitionName : o.toString());
         }
         partitionSpecs = FileUtils.makePartName(conf.getPartColumns(), values);
-	if (isLogInfoEnabled) {
-	  LOG.info("Stats Gathering found a new partition spec = " + partitionSpecs);
-	}
+        if (isLogInfoEnabled) {
+          LOG.info("Stats Gathering found a new partition spec = " + partitionSpecs);
+        }
       }
       // find which column contains the raw data size (both partitioned and non partitioned
       int uSizeColumn = -1;
@@ -191,16 +193,17 @@ public class TableScanOperator extends O
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    initializeChildren(hconf);
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     inputFileChanged = false;
 
     if (conf == null) {
-      return;
+      return result;
     }
+
     rowLimit = conf.getRowLimit();
     if (!conf.isGatherStats()) {
-      return;
+      return result;
     }
 
     this.hconf = hconf;
@@ -216,9 +219,9 @@ public class TableScanOperator extends O
     stats = new HashMap<String, Stat>();
     if (conf.getPartColumns() == null || conf.getPartColumns().size() == 0) {
       // NON PARTITIONED table
-      return;
+      return result;
     }
-
+    return result;
   }
 
   @Override
@@ -282,7 +285,7 @@ public class TableScanOperator extends O
     if (!statsPublisher.connect(jc)) {
       // just return, stats gathering should not block the main query.
       if (isLogInfoEnabled) {
-	LOG.info("StatsPublishing error: cannot connect to database.");
+        LOG.info("StatsPublishing error: cannot connect to database.");
       }
       if (isStatsReliable) {
         throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java Wed Mar 18 05:03:23 2015
@@ -33,8 +33,8 @@ public class TezDummyStoreOperator exten
    * the records.
    */
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
-    super.processOp(row, tag);
+  public void process(Object row, int tag) throws HiveException {
+    super.process(row, tag);
     forward(result.o, outputObjInspector);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Wed Mar 18 05:03:23 2015
@@ -20,7 +20,9 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -57,7 +59,8 @@ public class UDTFOperator extends Operat
   transient AutoProgressor autoProgressor;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     genericUDTF = conf.getGenericUDTF();
     collector = new UDTFCollector(this);
 
@@ -90,13 +93,11 @@ public class UDTFOperator extends Operat
               hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS));
       autoProgressor.go();
     }
-
-    // Initialize the rest of the operator DAG
-    super.initializeOp(hconf);
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     // The UDTF expects arguments in an object[]
     StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
     List<? extends StructField> fields = soi.getAllStructFieldRefs();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Wed Mar 18 05:03:23 2015
@@ -20,7 +20,9 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -54,7 +56,8 @@ public class UnionOperator extends Opera
    * needsTransform[].
    */
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
 
     int parents = parentOperators.size();
     parentObjInspectors = new StructObjectInspector[parents];
@@ -116,11 +119,11 @@ public class UnionOperator extends Opera
             + "] from " + inputObjInspectors[p] + " to " + outputObjInspector);
       }
     }
-    initializeChildren(hconf);
+    return result;
   }
 
   @Override
-  public synchronized void processOp(Object row, int tag) throws HiveException {
+  public synchronized void process(Object row, int tag) throws HiveException {
 
     StructObjectInspector soi = parentObjInspectors[tag];
     List<? extends StructField> fields = parentFields[tag];

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Wed Mar 18 05:03:23 2015
@@ -24,9 +24,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -105,6 +105,7 @@ public class ExecMapper extends MapReduc
       }
       mo.setConf(mrwork);
       // initialize map operator
+      mo.initialize(job, null);
       mo.setChildren(job);
       l4j.info(mo.dump(0));
       // initialize map local work
@@ -113,9 +114,9 @@ public class ExecMapper extends MapReduc
 
       MapredContext.init(true, new JobConf(jc));
 
-      mo.setExecContext(execContext);
+      mo.passExecContext(execContext);
       mo.initializeLocalWork(jc);
-      mo.initialize(jc, null);
+      mo.initializeMapOperator(jc);
 
       if (localWork == null) {
         return;
@@ -126,7 +127,7 @@ public class ExecMapper extends MapReduc
       l4j.info("Initializing dummy operator");
       List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
       for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
-        dummyOp.setExecContext(execContext);
+        dummyOp.passExecContext(execContext);
         dummyOp.initialize(jc,null);
       }
     } catch (Throwable e) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Wed Mar 18 05:03:23 2015
@@ -232,7 +232,7 @@ public class ExecReducer extends MapRedu
         row.add(valueObject[tag]);
 
         try {
-          reducer.processOp(row, tag);
+          reducer.process(row, tag);
         } catch (Exception e) {
           String rowString = null;
           try {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java Wed Mar 18 05:03:23 2015
@@ -32,11 +32,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -62,7 +61,8 @@ public class HashTableLoader implements
   private MapJoinDesc desc;
 
   @Override
-  public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
+  public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
+      MapJoinOperator joinOp) {
     this.context = context;
     this.hconf = hconf;
     this.joinOp = joinOp;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Wed Mar 18 05:03:23 2015
@@ -404,7 +404,7 @@ public class MapredLocalTask extends Tas
         if (row == null) {
           break;
         }
-        forwardOp.processOp(row.o, 0);
+        forwardOp.process(row.o, 0);
       }
       forwardOp.flush();
     }
@@ -445,7 +445,7 @@ public class MapredLocalTask extends Tas
       Operator<? extends OperatorDesc> forwardOp = work.getAliasToWork().get(alias);
 
       // put the exe context into all the operators
-      forwardOp.setExecContext(execContext);
+      forwardOp.passExecContext(execContext);
       // All the operators need to be initialized before process
       FetchOperator fetchOp = entry.getValue();
       JobConf jobConf = fetchOpJobConfMap.get(fetchOp);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java Wed Mar 18 05:03:23 2015
@@ -19,6 +19,10 @@
 package org.apache.hadoop.hive.ql.exec.mr;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,18 +36,59 @@ import org.apache.hadoop.hive.ql.metadat
 public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
 
   private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
+  private static final boolean isInfoEnabled = LOG.isInfoEnabled();
 
   @Override
   public void release(String key) {
     // nothing to do
+    if (isInfoEnabled) {
+      LOG.info(key + " no longer needed");
+    }
   }
 
   @Override
-  public Object retrieve(String key, Callable<?> fn) throws HiveException {
+  public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
     try {
+      if (isInfoEnabled) {
+        LOG.info("Creating " + key);
+      }
       return fn.call();
     } catch (Exception e) {
       throw new HiveException(e);
     }
   }
+
+  @Override
+  public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws HiveException {
+    final T value = retrieve(key, fn);
+
+    return new Future<T>() {
+
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return false;
+      }
+
+      @Override
+      public boolean isDone() {
+        return true;
+      }
+
+      @Override
+      public T get() throws InterruptedException, ExecutionException {
+        return value;
+      }
+
+      @Override
+      public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+          TimeoutException {
+        return value;
+      }
+    };
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java Wed Mar 18 05:03:23 2015
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -58,7 +59,8 @@ public class HashTableLoader implements
   private MapJoinDesc desc;
 
   @Override
-  public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
+  public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
+      MapJoinOperator joinOp) {
     this.context = context;
     this.hconf = hconf;
     this.joinOp = joinOp;
@@ -66,9 +68,9 @@ public class HashTableLoader implements
   }
 
   @Override
-  public void load(
-      MapJoinTableContainer[] mapJoinTables,
-      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
+  public void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage)
+      throws HiveException {
 
     // Note: it's possible that a MJ operator is in a ReduceWork, in which case the
     // currentInputPath will be null. But, since currentInputPath is only interesting

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Wed Mar 18 05:03:23 2015
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -29,7 +33,6 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -38,10 +41,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
 
 /**
  * Clone from ExecMapper. SparkMapRecordHandler is the bridge between the spark framework and
@@ -61,6 +60,7 @@ public class SparkMapRecordHandler exten
   private boolean isLogInfoEnabled = false;
   private ExecMapperContext execContext;
 
+  @Override
   public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
     super.init(job, output, reporter);
@@ -81,6 +81,7 @@ public class SparkMapRecordHandler exten
       mo.setConf(mrwork);
 
       // initialize map operator
+      mo.initialize(jc, null);
       mo.setChildren(job);
       LOG.info(mo.dump(0));
       // initialize map local work
@@ -90,9 +91,9 @@ public class SparkMapRecordHandler exten
       MapredContext.init(true, new JobConf(jc));
       MapredContext.get().setReporter(reporter);
 
-      mo.setExecContext(execContext);
+      mo.passExecContext(execContext);
       mo.initializeLocalWork(jc);
-      mo.initialize(jc, null);
+      mo.initializeMapOperator(jc);
 
       OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
       mo.setReporter(rp);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java Wed Mar 18 05:03:23 2015
@@ -93,7 +93,7 @@ public class SparkMergeFileRecordHandler
     row[0] = key;
     row[1] = value;
     try {
-      mergeOp.processOp(row, 0);
+      mergeOp.process(row, 0);
     } catch (HiveException e) {
       abort = true;
       throw new IOException(e);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Wed Mar 18 05:03:23 2015
@@ -103,6 +103,7 @@ public class SparkReduceRecordHandler ex
   private List<VectorExpressionWriter>[] valueStringWriters;
   private MapredLocalWork localWork = null;
 
+  @Override
   @SuppressWarnings("unchecked")
   public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
@@ -132,7 +133,7 @@ public class SparkReduceRecordHandler ex
         keyStructInspector = (StructObjectInspector) keyObjectInspector;
         batches = new VectorizedRowBatch[maxTags];
         valueStructInspectors = new StructObjectInspector[maxTags];
-        valueStringWriters = (List<VectorExpressionWriter>[]) new List[maxTags];
+        valueStringWriters = new List[maxTags];
         keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
         buffer = new DataOutputBuffer();
       }
@@ -196,7 +197,7 @@ public class SparkReduceRecordHandler ex
     localWork = gWork.getMapRedLocalWork();
     execContext.setJc(jc);
     execContext.setLocalWork(localWork);
-    reducer.setExecContext(execContext);
+    reducer.passExecContext(execContext);
 
     reducer.setReporter(rp);
     OperatorUtils.setChildrenCollector(
@@ -318,7 +319,7 @@ public class SparkReduceRecordHandler ex
         logMemoryInfo();
       }
       try {
-        reducer.processOp(row, tag);
+        reducer.process(row, tag);
       } catch (Exception e) {
         String rowString = null;
         try {
@@ -360,7 +361,7 @@ public class SparkReduceRecordHandler ex
         rowIdx++;
         if (rowIdx >= BATCH_SIZE) {
           VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-          reducer.processOp(batch, tag);
+          reducer.process(batch, tag);
           rowIdx = 0;
           if (isLogInfoEnabled) {
             logMemoryInfo();
@@ -369,7 +370,7 @@ public class SparkReduceRecordHandler ex
       }
       if (rowIdx > 0) {
         VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-        reducer.processOp(batch, tag);
+        reducer.process(batch, tag);
       }
       if (isLogInfoEnabled) {
         logMemoryInfo();
@@ -401,6 +402,7 @@ public class SparkReduceRecordHandler ex
     }
   }
 
+  @Override
   public void close() {
 
     // No row was processed

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Wed Mar 18 05:03:23 2015
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -26,11 +27,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
@@ -43,6 +42,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.Writable;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
@@ -54,30 +54,29 @@ public class HashTableLoader implements
 
   private static final Log LOG = LogFactory.getLog(HashTableLoader.class.getName());
 
-  private ExecMapperContext context;
   private Configuration hconf;
   private MapJoinDesc desc;
+  private TezContext tezContext;
 
   @Override
-  public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
-    this.context = context;
+  public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
+      MapJoinOperator joinOp) {
+    this.tezContext = (TezContext) mrContext;
     this.hconf = hconf;
     this.desc = joinOp.getConf();
   }
 
   @Override
-  public void load(
-      MapJoinTableContainer[] mapJoinTables,
-      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
+  public void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage)
+      throws HiveException {
 
-    TezContext tezContext = (TezContext) MapredContext.get();
     Map<Integer, String> parentToInput = desc.getParentToInput();
     Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
 
     boolean useOptimizedTables = HiveConf.getBoolVar(
         hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE);
     boolean isFirstKey = true;
-    TezCacheAccess tezCacheAccess = TezCacheAccess.createInstance(hconf);
     for (int pos = 0; pos < mapJoinTables.length; pos++) {
       if (pos == desc.getPosBigTable()) {
         continue;
@@ -87,6 +86,14 @@ public class HashTableLoader implements
       LogicalInput input = tezContext.getInput(inputName);
 
       try {
+        input.start();
+        tezContext.getTezProcessorContext().waitForAnyInputReady(
+            Collections.<Input> singletonList(input));
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+
+      try {
         KeyValueReader kvReader = (KeyValueReader) input.getReader();
         MapJoinObjectSerDeContext keyCtx = mapJoinTableSerdes[pos].getKeyContext(),
           valCtx = mapJoinTableSerdes[pos].getValueContext();
@@ -122,14 +129,6 @@ public class HashTableLoader implements
       } catch (Exception e) {
         throw new HiveException(e);
       }
-      // Register that the Input has been cached.
-      LOG.info("Is this a bucket map join: " + desc.isBucketMapJoin());
-      // cache is disabled for bucket map join because of the same reason
-      // given in loadHashTable in MapJoinOperator.
-      if (!desc.isBucketMapJoin()) {
-        tezCacheAccess.registerCachedInput(inputName);
-        LOG.info("Setting Input: " + inputName + " as cached");
-      }
     }
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Mar 18 05:03:23 2015
@@ -22,9 +22,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 
@@ -55,6 +57,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -98,6 +101,7 @@ public class MapRecordProcessor extends
 
     // create map and fetch operators
     mapWork = (MapWork) cache.retrieve(key, new Callable<Object>() {
+        @Override
         public Object call() {
           return Utilities.getMapWork(jconf);
         }
@@ -119,6 +123,7 @@ public class MapRecordProcessor extends
 	mergeWorkList.add(
           (MapWork) cache.retrieve(key,
               new Callable<Object>() {
+                @Override
                 public Object call() {
                   return Utilities.getMergeWork(jconf, prefix);
                 }
@@ -133,6 +138,10 @@ public class MapRecordProcessor extends
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
 
+    MapredContext.init(true, new JobConf(jconf));
+    ((TezContext) MapredContext.get()).setInputs(inputs);
+    ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
+
     // Update JobConf using MRInput, info like filename comes via this
     legacyMRInput = getMRInput(inputs);
     if (legacyMRInput != null) {
@@ -160,6 +169,8 @@ public class MapRecordProcessor extends
         mapOp = new MapOperator();
       }
 
+      mapOp.setExecContext(execContext);
+
       connectOps.clear();
       if (mergeWorkList != null) {
         MapOperator mergeMapOp = null;
@@ -176,12 +187,13 @@ public class MapRecordProcessor extends
             mergeMapOp.setConf(mergeMapWork);
             l4j.info("Input name is " + mergeMapWork.getName());
             jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName());
+            mergeMapOp.initialize(jconf, null);
             mergeMapOp.setChildren(jconf);
 
             DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
             connectOps.put(mergeMapWork.getTag(), dummyOp);
 
-            mergeMapOp.setExecContext(new ExecMapperContext(jconf));
+            mergeMapOp.passExecContext(new ExecMapperContext(jconf));
             mergeMapOp.initializeLocalWork(jconf);
           }
         }
@@ -191,21 +203,19 @@ public class MapRecordProcessor extends
       mapOp.setConf(mapWork);
       l4j.info("Main input name is " + mapWork.getName());
       jconf.set(Utilities.INPUT_NAME, mapWork.getName());
+      mapOp.initialize(jconf, null);
       mapOp.setChildren(jconf);
+      mapOp.passExecContext(execContext);
       l4j.info(mapOp.dump(0));
 
-      MapredContext.init(true, new JobConf(jconf));
-      ((TezContext) MapredContext.get()).setInputs(inputs);
-      ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
-      mapOp.setExecContext(execContext);
       mapOp.initializeLocalWork(jconf);
 
       initializeMapRecordSources();
-      mapOp.initialize(jconf, null);
+      mapOp.initializeMapOperator(jconf);
       if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) {
         for (MapOperator mergeMapOp : mergeMapOpList) {
           jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName());
-          mergeMapOp.initialize(jconf, null);
+          mergeMapOp.initializeMapOperator(jconf);
         }
       }
 
@@ -353,6 +363,17 @@ public class MapRecordProcessor extends
   private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
     // there should be only one MRInput
     MRInputLegacy theMRInput = null;
+
+    // start all mr/multi-mr inputs
+    Set<Input> li = new HashSet<Input>();
+    for (LogicalInput inp: inputs.values()) {
+      if (inp instanceof MRInputLegacy || inp instanceof MultiMRInput) {
+        inp.start();
+        li.add(inp);
+      }
+    }
+    processorContext.waitForAllInputsReady(li);
+
     l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray()));
     for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
       if (inp.getValue() instanceof MRInputLegacy) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Wed Mar 18 05:03:23 2015
@@ -98,10 +98,11 @@ public class MergeFileRecordProcessor ex
       cacheKey = queryId + MAP_PLAN_KEY;
 
       MapWork mapWork = (MapWork) cache.retrieve(cacheKey, new Callable<Object>() {
-	  public Object call() {
-	    return Utilities.getMapWork(jconf);
-	  }
-	});
+        @Override
+        public Object call() {
+          return Utilities.getMapWork(jconf);
+        }
+      });
       Utilities.setMapWork(jconf, mapWork);
 
       if (mapWork instanceof MergeFileWork) {
@@ -116,7 +117,7 @@ public class MergeFileRecordProcessor ex
 
       MapredContext.init(true, new JobConf(jconf));
       ((TezContext) MapredContext.get()).setInputs(inputs);
-      mergeOp.setExecContext(execContext);
+      mergeOp.passExecContext(execContext);
       mergeOp.initializeLocalWork(jconf);
       mergeOp.initialize(jconf, null);
 
@@ -198,7 +199,7 @@ public class MergeFileRecordProcessor ex
       } else {
         row[0] = key;
         row[1] = value;
-        mergeOp.processOp(row, 0);
+        mergeOp.process(row, 0);
       }
     } catch (Throwable e) {
       abort = true;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Wed Mar 18 05:03:23 2015
@@ -19,11 +19,14 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.tez.runtime.api.ObjectRegistry;
 
 import com.google.common.base.Preconditions;
 
@@ -41,6 +44,8 @@ public class ObjectCache implements org.
   // before anything else.
   private volatile static ObjectRegistry staticRegistry;
 
+  private static ExecutorService staticPool;
+
   private final ObjectRegistry registry;
 
   public ObjectCache() {
@@ -51,6 +56,7 @@ public class ObjectCache implements org.
 
   public static void setupObjectRegistry(ObjectRegistry objectRegistry) {
     staticRegistry = objectRegistry;
+    staticPool = Executors.newCachedThreadPool();
   }
 
   @Override
@@ -59,21 +65,32 @@ public class ObjectCache implements org.
     LOG.info("Releasing key: " + key);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public Object retrieve(String key, Callable<?> fn) throws HiveException {
-    Object o;
+  public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
+    T value;
     try {
-      o = registry.get(key);
-      if (o == null) {
-	o = fn.call();
-	LOG.info("Caching key: " + key);
-	registry.cacheForVertex(key, o);
+      value = (T) registry.get(key);
+      if (value == null) {
+        value = fn.call();
+        LOG.info("Caching key: " + key);
+        registry.cacheForVertex(key, value);
       } else {
-	LOG.info("Found " + key + " in cache with value: " + o);
+        LOG.info("Found " + key + " in cache with value: " + value);
       }
     } catch (Exception e) {
       throw new HiveException(e);
     }
-    return o;
+    return value;
+  }
+
+  @Override
+  public <T> Future<T> retrieveAsync(final String key, final Callable<T> fn) throws HiveException {
+    return staticPool.submit(new Callable<T>() {
+      @Override
+      public T call() throws Exception {
+        return retrieve(key, fn);
+      }
+    });
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Mar 18 05:03:23 2015
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -85,6 +86,7 @@ public class ReduceRecordProcessor  exte
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
     cacheKey = queryId + REDUCE_PLAN_KEY;
     redWork = (ReduceWork) cache.retrieve(cacheKey, new Callable<Object>() {
+        @Override
         public Object call() {
           return Utilities.getReduceWork(jconf);
         }
@@ -103,9 +105,14 @@ public class ReduceRecordProcessor  exte
     for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
       TableDesc keyTableDesc = redWork.getKeyDesc();
       TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag);
-      KeyValuesReader reader =
-          (KeyValuesReader) inputs.get(redWork.getTagToInput().get(tag)).getReader();
 
+      // make the reader ready for prime time
+      Input input = inputs.get(redWork.getTagToInput().get(tag));
+      input.start();
+      processorContext.waitForAnyInputReady(Collections.singleton(input));
+      KeyValuesReader reader = (KeyValuesReader) input.getReader();
+
+      // now we can setup the record source
       sources[tag] = new ReduceRecordSource();
       sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc,
           reader, tag == position, (byte) tag,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Wed Mar 18 05:03:23 2015
@@ -325,7 +325,7 @@ public class ReduceRecordSource implemen
       row.add(deserializeValue(valueWritable, tag));
 
       try {
-        reducer.processOp(row, tag);
+        reducer.process(row, tag);
       } catch (Exception e) {
         String rowString = null;
         try {
@@ -364,7 +364,7 @@ public class ReduceRecordSource implemen
         rowIdx++;
         if (rowIdx >= BATCH_SIZE) {
           VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-          reducer.processOp(batch, tag);
+          reducer.process(batch, tag);
 
           // Reset just the value columns and value buffer.
           for (int i = keysColumnOffset; i < batch.numCols; i++) {
@@ -377,7 +377,7 @@ public class ReduceRecordSource implemen
       if (rowIdx > 0) {
         // Flush final partial batch.
         VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-        reducer.processOp(batch, tag);
+        reducer.process(batch, tag);
       }
       batch.reset();
       keyBuffer.reset();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Mar 18 05:03:23 2015
@@ -19,9 +19,9 @@ package org.apache.hadoop.hive.ql.exec.t
 
 import java.io.IOException;
 import java.text.NumberFormat;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,7 +33,6 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -143,20 +142,6 @@ public class TezProcessor extends Abstra
       throws Exception {
     Throwable originalThrowable = null;
     try {
-      // Outputs will be started later by the individual Processors.
-      TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
-      // Start the actual Inputs. After MRInput initialization.
-      for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
-        if (!cacheAccess.isInputCached(inputEntry.getKey())) {
-          LOG.info("Starting input " + inputEntry.getKey());
-          inputEntry.getValue().start();
-          processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputEntry
-              .getValue())));
-        } else {
-          LOG.info("Input: " + inputEntry.getKey()
-              + " is already cached. Skipping start and wait for ready");
-        }
-      }
 
       MRTaskReporter mrReporter = new MRTaskReporter(getContext());
       rproc.init(jobConf, getContext(), mrReporter, inputs, outputs);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java Wed Mar 18 05:03:23 2015
@@ -18,7 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
@@ -27,15 +28,9 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -59,18 +54,19 @@ public class VectorAppMasterEventOperato
   }
 
   @Override
-  public void initializeOp(Configuration hconf) throws HiveException {
-    super.initializeOp(hconf);
+  public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     valueWriters = VectorExpressionWriterFactory.getExpressionWriters(
         (StructObjectInspector) inputObjInspectors[0]);
     singleRow = new Object[valueWriters.length];
+    return result;
   }
 
   @Override
-  public void processOp(Object data, int tag) throws HiveException {
-    
+  public void process(Object data, int tag) throws HiveException {
+
     VectorizedRowBatch vrg = (VectorizedRowBatch) data;
-    
+
     Writable [] records = null;
     Writable recordValue = null;
     boolean vectorizedSerde = false;
@@ -85,7 +81,7 @@ public class VectorAppMasterEventOperato
     } catch (SerDeException e1) {
       throw new HiveException(e1);
     }
-    
+
     for (int i = 0; i < vrg.size; i++) {
       Writable row = null;
       if (vectorizedSerde) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Wed Mar 18 05:03:23 2015
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.util.Collection;
+import java.util.concurrent.Future;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -50,7 +53,7 @@ public class VectorFileSinkOperator exte
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
     // We need a input object inspector that is for the row we will extract out of the
     // vectorized row batch, not for example, an original inspector for an ORC table, etc.
     VectorExpressionWriterFactory.processVectorInspector(
@@ -66,15 +69,15 @@ public class VectorFileSinkOperator exte
     singleRow = new Object[valueWriters.length];
 
     // Call FileSinkOperator with new input inspector.
-    super.initializeOp(hconf);
+    return super.initializeOp(hconf);
   }
 
   @Override
-  public void processOp(Object data, int tag) throws HiveException {
+  public void process(Object data, int tag) throws HiveException {
     VectorizedRowBatch vrg = (VectorizedRowBatch)data;
     for (int i = 0; i < vrg.size; i++) {
       Object[] row = getRowObject(vrg, i);
-      super.processOp(row, tag);
+      super.process(row, tag);
     }
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java Wed Mar 18 05:03:23 2015
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.util.Collection;
+import java.util.concurrent.Future;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -27,7 +30,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 
 /**
  * Filter operator implementation.
@@ -39,7 +41,7 @@ public class VectorFilterOperator extend
   private VectorExpression conditionEvaluator = null;
 
   // Temporary selected vector
-  private int[] temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE];
+  private final int[] temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE];
 
   // filterMode is 1 if condition is always true, -1 if always false
   // and 0 if condition needs to be computed.
@@ -59,7 +61,8 @@ public class VectorFilterOperator extend
 
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     try {
       heartbeatInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVESENDHEARTBEAT);
@@ -74,7 +77,7 @@ public class VectorFilterOperator extend
         filterMode = -1;
       }
     }
-    initializeChildren(hconf);
+    return result;
   }
 
   public void setFilterCondition(VectorExpression expr) {
@@ -82,7 +85,7 @@ public class VectorFilterOperator extend
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
 
     VectorizedRowBatch vrg = (VectorizedRowBatch) row;
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Wed Mar 18 05:03:23 2015
@@ -22,18 +22,19 @@ import java.lang.management.ManagementFa
 import java.lang.management.MemoryMXBean;
 import java.lang.ref.SoftReference;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.Ag
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -54,7 +56,8 @@ import org.apache.hadoop.io.DataOutputBu
  * stores the aggregate operators' intermediate states. Emits row mode output.
  *
  */
-public class VectorGroupByOperator extends GroupByOperator implements VectorizationContextRegion {
+public class VectorGroupByOperator extends Operator<GroupByDesc> implements
+    VectorizationContextRegion {
 
   private static final Log LOG = LogFactory.getLog(
       VectorGroupByOperator.class.getName());
@@ -100,7 +103,15 @@ public class VectorGroupByOperator exten
   private transient VectorizedRowBatchCtx vrbCtx;
 
   private transient VectorColumnAssign[] vectorColumnAssign;
-  
+
+  private transient int numEntriesHashTable;
+
+  private transient long maxHashTblMemory;
+
+  private transient long maxMemory;
+
+  private float memoryThreshold;
+
   /**
    * Interface for processing mode: global, hash, unsorted streaming, or group batch
    */
@@ -118,9 +129,11 @@ public class VectorGroupByOperator exten
   private abstract class ProcessingModeBase implements IProcessingMode {
 
     // Overridden and used in sorted reduce group batch processing mode.
+    @Override
     public void startGroup() throws HiveException {
       // Do nothing.
     }
+    @Override
     public void endGroup() throws HiveException {
       // Do nothing.
     }
@@ -177,7 +190,7 @@ public class VectorGroupByOperator exten
   private class ProcessingModeGlobalAggregate extends ProcessingModeBase {
 
     /**
-     * In global processing mode there is only one set of aggregation buffers 
+     * In global processing mode there is only one set of aggregation buffers
      */
     private VectorAggregationBufferRow aggregationBuffers;
 
@@ -233,7 +246,7 @@ public class VectorGroupByOperator exten
     private long sumBatchSize;
 
     /**
-     * Max number of entries in the vector group by aggregation hashtables. 
+     * Max number of entries in the vector group by aggregation hashtables.
      * Exceeding this will trigger a flush irrelevant of memory pressure condition.
      */
     private int maxHtEntries = 1000000;
@@ -247,12 +260,12 @@ public class VectorGroupByOperator exten
      * Percent of entries to flush when memory threshold exceeded.
      */
     private float percentEntriesToFlush = 0.1f;
-  
+
     /**
      * A soft reference used to detect memory pressure
      */
     private SoftReference<Object> gcCanary = new SoftReference<Object>(new Object());
-    
+
     /**
      * Counts the number of time the gcCanary died and was resurrected
      */
@@ -289,7 +302,7 @@ public class VectorGroupByOperator exten
             HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
           this.numRowsCompareHashAggr = HiveConf.getIntVar(hconf,
             HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL);
-      } 
+      }
       else {
         this.percentEntriesToFlush =
             HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT.defaultFloatVal;
@@ -322,14 +335,14 @@ public class VectorGroupByOperator exten
       processAggregators(batch);
 
       //Flush if memory limits were reached
-      // We keep flushing until the memory is under threshold 
+      // We keep flushing until the memory is under threshold
       int preFlushEntriesCount = numEntriesHashTable;
       while (shouldFlush(batch)) {
         flush(false);
 
         if(gcCanary.get() == null) {
           gcCanaryFlushes++;
-          gcCanary = new SoftReference<Object>(new Object()); 
+          gcCanary = new SoftReference<Object>(new Object());
         }
 
         //Validate that some progress is being made
@@ -468,7 +481,7 @@ public class VectorGroupByOperator exten
         mapKeysAggregationBuffers.clear();
         numEntriesHashTable = 0;
       }
-      
+
       if (all && LOG.isDebugEnabled()) {
         LOG.debug(String.format("GC canary caused %d flushes", gcCanaryFlushes));
       }
@@ -495,7 +508,7 @@ public class VectorGroupByOperator exten
       if (gcCanary.get() == null) {
         return true;
       }
-      
+
       return false;
     }
 
@@ -515,14 +528,14 @@ public class VectorGroupByOperator exten
     }
 
     /**
-     * Checks if the HT reduces the number of entries by at least minReductionHashAggr factor 
+     * Checks if the HT reduces the number of entries by at least minReductionHashAggr factor
      * @throws HiveException
      */
     private void checkHashModeEfficiency() throws HiveException {
       if (lastModeCheckRowCount > numRowsCompareHashAggr) {
         lastModeCheckRowCount = 0;
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("checkHashModeEfficiency: HT:%d RC:%d MIN:%d", 
+          LOG.debug(String.format("checkHashModeEfficiency: HT:%d RC:%d MIN:%d",
               numEntriesHashTable, sumBatchSize, (long)(sumBatchSize * minReductionHashAggr)));
         }
         if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) {
@@ -541,7 +554,7 @@ public class VectorGroupByOperator exten
    */
   private class ProcessingModeUnsortedStreaming extends ProcessingModeBase {
 
-    /** 
+    /**
      * The aggregation buffers used in streaming mode
      */
     private VectorAggregationBufferRow currentStreamingAggregators;
@@ -554,19 +567,19 @@ public class VectorGroupByOperator exten
     /**
      * The keys that needs to be flushed at the end of the current batch
      */
-    private final VectorHashKeyWrapper[] keysToFlush = 
+    private final VectorHashKeyWrapper[] keysToFlush =
         new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE];
 
     /**
      * The aggregates that needs to be flushed at the end of the current batch
      */
-    private final VectorAggregationBufferRow[] rowsToFlush = 
+    private final VectorAggregationBufferRow[] rowsToFlush =
         new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE];
 
     /**
      * A pool of VectorAggregationBufferRow to avoid repeated allocations
      */
-    private VectorUtilBatchObjectPool<VectorAggregationBufferRow> 
+    private VectorUtilBatchObjectPool<VectorAggregationBufferRow>
       streamAggregationBufferRowPool;
 
     @Override
@@ -658,7 +671,7 @@ public class VectorGroupByOperator exten
    *      vectorized reduce-shuffle feeds the batches to us.
    *
    *   2) Later at endGroup after reduce-shuffle has fed us all the input batches for the group,
-   *      we fill in the aggregation columns in outputBatch at outputBatch.size.  Our method 
+   *      we fill in the aggregation columns in outputBatch at outputBatch.size.  Our method
    *      writeGroupRow does this and finally increments outputBatch.size.
    *
    */
@@ -672,7 +685,7 @@ public class VectorGroupByOperator exten
      */
     VectorGroupKeyHelper groupKeyHelper;
 
-    /** 
+    /**
      * The group vector aggregation buffers.
      */
     private VectorAggregationBufferRow groupAggregators;
@@ -750,7 +763,7 @@ public class VectorGroupByOperator exten
       AggregationDesc aggDesc = aggrDesc.get(i);
       aggregators[i] = vContext.getAggregatorExpression(aggDesc, desc.getVectorDesc().isReduce());
     }
-    
+
     isVectorOutput = desc.getVectorDesc().isVectorOutput();
 
     vOutContext = new VectorizationContext(desc.getOutputColumnNames());
@@ -762,7 +775,8 @@ public class VectorGroupByOperator exten
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
 
@@ -773,9 +787,9 @@ public class VectorGroupByOperator exten
 
       // grouping id should be pruned, which is the last of key columns
       // see ColumnPrunerGroupByProc
-      outputKeyLength = 
+      outputKeyLength =
           conf.pruneGroupingSetId() ? keyExpressions.length - 1 : keyExpressions.length;
-      
+
       keyOutputWriters = new VectorExpressionWriter[outputKeyLength];
 
       for(int i = 0; i < outputKeyLength; ++i) {
@@ -812,8 +826,6 @@ public class VectorGroupByOperator exten
       throw new HiveException(e);
     }
 
-    initializeChildren(hconf);
-
     forwardCache = new Object[outputKeyLength + aggregators.length];
 
     if (outputKeyLength == 0) {
@@ -826,13 +838,14 @@ public class VectorGroupByOperator exten
       processingMode = this.new ProcessingModeHashAggregate();
     }
     processingMode.initialize(hconf);
+    return result;
   }
 
   /**
    * changes the processing mode to unsorted streaming
-   * This is done at the request of the hash agg mode, if the number of keys 
+   * This is done at the request of the hash agg mode, if the number of keys
    * exceeds the minReductionHashAggr factor
-   * @throws HiveException 
+   * @throws HiveException
    */
   private void changeToUnsortedStreamingMode() throws HiveException {
     processingMode = this.new ProcessingModeUnsortedStreaming();
@@ -859,7 +872,7 @@ public class VectorGroupByOperator exten
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     VectorizedRowBatch batch = (VectorizedRowBatch) row;
 
     if (batch.size > 0) {
@@ -962,4 +975,9 @@ public class VectorGroupByOperator exten
   public VectorizationContext getOuputVectorizationContext() {
     return vOutContext;
   }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.GROUPBY;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java Wed Mar 18 05:03:23 2015
@@ -39,7 +39,7 @@ public class VectorLimitOperator extends
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     VectorizedRowBatch batch = (VectorizedRowBatch) row;
 
     if (currCount < limit) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java Wed Mar 18 05:03:23 2015
@@ -19,9 +19,11 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,7 +58,7 @@ public class VectorMapJoinOperator exten
 
   private VectorExpression[] bigTableFilterExpressions;
   private VectorExpression[] bigTableValueExpressions;
-  
+
   private VectorizationContext vOutContext;
 
   // The above members are initialized by the constructor and must not be
@@ -76,7 +78,7 @@ public class VectorMapJoinOperator exten
   private transient VectorExpressionWriter[] keyOutputWriters;
 
   private transient VectorizedRowBatchCtx vrbCtx = null;
-  
+
   public VectorMapJoinOperator() {
     super();
   }
@@ -112,9 +114,9 @@ public class VectorMapJoinOperator exten
   }
 
   @Override
-  public void initializeOp(Configuration hconf) throws HiveException {
-    super.initializeOp(hconf);
-    
+  public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
+
     List<ExprNodeDesc> keyDesc = conf.getKeys().get(posBigTable);
     keyOutputWriters = VectorExpressionWriterFactory.getExpressionWriters(keyDesc);
 
@@ -178,6 +180,7 @@ public class VectorMapJoinOperator exten
     filterMaps[posBigTable] = null;
 
     outputVectorAssigners = new HashMap<ObjectInspector, VectorColumnAssign[]>();
+    return result;
   }
 
   /**
@@ -220,7 +223,7 @@ public class VectorMapJoinOperator exten
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     byte alias = (byte) tag;
     VectorizedRowBatch inBatch = (VectorizedRowBatch) row;
 
@@ -246,7 +249,7 @@ public class VectorMapJoinOperator exten
     // of row-mode small-tables) this is a reasonable trade-off.
     //
     for(batchIndex=0; batchIndex < inBatch.size; ++batchIndex) {
-      super.processOp(row, tag);
+      super.process(row, tag);
     }
 
     // Set these two to invalid values so any attempt to use them

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1667454&r1=1667453&r2=1667454&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Wed Mar 18 05:03:23 2015
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.util.Collection;
+import java.util.concurrent.Future;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -34,7 +37,7 @@ public class VectorReduceSinkOperator ex
 
   // Writer for producing row from input batch.
   private VectorExpressionWriter[] rowWriters;
-  
+
   protected transient Object[] singleRow;
 
   public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
@@ -49,7 +52,7 @@ public class VectorReduceSinkOperator ex
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
     // We need a input object inspector that is for the row we will extract out of the
     // vectorized row batch, not for example, an original inspector for an ORC table, etc.
     VectorExpressionWriterFactory.processVectorInspector(
@@ -64,17 +67,16 @@ public class VectorReduceSinkOperator ex
             });
     singleRow = new Object[rowWriters.length];
 
-    // Call ReduceSinkOperator with new input inspector.
-    super.initializeOp(hconf);
+    return super.initializeOp(hconf);
   }
 
   @Override
-  public void processOp(Object data, int tag) throws HiveException {
+  public void process(Object data, int tag) throws HiveException {
     VectorizedRowBatch vrg = (VectorizedRowBatch) data;
 
     for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
       Object row = getRowObject(vrg, batchIndex);
-      super.processOp(row, tag);
+      super.process(row, tag);
     }
   }
 



Mime
View raw message