hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1668750 [2/8] - in /hive/branches/cbo: ./ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/org/apache/hadoop/hive/conf/ common/src/java/org/apache/hive/common/util/ common/s...
Date Mon, 23 Mar 2015 22:02:16 GMT
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java Mon Mar 23 22:02:13 2015
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.util.Collection;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -33,7 +37,7 @@ public class LateralViewForwardOperator
   private static final long serialVersionUID = 1L;
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     forward(row, inputObjInspectors[tag]);
   }
 
@@ -50,4 +54,9 @@ public class LateralViewForwardOperator
   public OperatorType getType() {
     return OperatorType.LATERALVIEWFORWARD;
   }
+
+  @Override
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    return super.initializeOp(hconf);
+  }
 }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java Mon Mar 23 22:02:13 2015
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -82,7 +84,8 @@ public class LateralViewJoinOperator ext
   public static final byte UDTF_TAG = 1;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
 
     ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
     ArrayList<String> fieldNames = conf.getOutputInternalColNames();
@@ -104,9 +107,8 @@ public class LateralViewJoinOperator ext
 
     outputObjInspector = ObjectInspectorFactory
         .getStandardStructObjectInspector(fieldNames, ois);
+    return result;
 
-    // Initialize the rest of the operator DAG
-    super.initializeOp(hconf);
   }
 
   // acc is short for accumulator. It's used to build the row before forwarding
@@ -121,7 +123,7 @@ public class LateralViewJoinOperator ext
    * by all the corresponding rows from the UDTF operator. And so on.
    */
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
     if (tag == SELECT_TAG) {
       selectObjs.clear();

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java Mon Mar 23 22:02:13 2015
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -37,16 +39,17 @@ public class LimitOperator extends Opera
   protected transient boolean isMap;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    super.initializeOp(hconf);
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     limit = conf.getLimit();
     leastRow = conf.getLeastRows();
     currCount = 0;
     isMap = hconf.getBoolean("mapred.task.is.map", true);
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (currCount < limit) {
       forward(row, inputObjInspectors[tag]);
       currCount++;

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java Mon Mar 23 22:02:13 2015
@@ -18,11 +18,12 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -43,13 +44,14 @@ public class ListSinkOperator extends Op
   private transient int numRows;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     try {
       fetcher = initializeFetcher(hconf);
     } catch (Exception e) {
       throw new HiveException(e);
     }
-    super.initializeOp(hconf);
+    return result;
   }
 
   private FetchFormatter initializeFetcher(Configuration conf) throws Exception {
@@ -81,8 +83,9 @@ public class ListSinkOperator extends Op
     return numRows;
   }
 
+  @Override
   @SuppressWarnings("unchecked")
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     try {
       res.add(fetcher.convert(row, inputObjInspectors[0]));
       numRows++;
@@ -91,6 +94,7 @@ public class ListSinkOperator extends Op
     }
   }
 
+  @Override
   public OperatorType getType() {
     return OperatorType.FORWARD;
   }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Mon Mar 23 22:02:13 2015
@@ -18,26 +18,38 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
 import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor;
 import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.KeyValueContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer;
+import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -47,8 +59,13 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import static org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition;
+import static org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper;
+
 /**
  * Map side Join operator implementation.
  */
@@ -70,6 +87,12 @@ public class MapJoinOperator extends Abs
   private transient ReusableGetAdaptor[] hashMapRowGetters;
 
   private UnwrapRowContainer[] unwrapContainer;
+  private transient Configuration hconf;
+  private transient boolean useHybridGraceHashJoin; // whether Hybrid Grace Hash Join is enabled
+  private transient boolean hybridMapJoinLeftover;  // whether there's spilled data to be processed
+  private transient MapJoinBytesTableContainer currentSmallTable; // reloaded hashmap from disk
+  private transient int tag;        // big table alias
+  private transient int smallTable; // small table alias
 
   public MapJoinOperator() {
   }
@@ -94,9 +117,14 @@ public class MapJoinOperator extends Abs
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    this.hconf = hconf;
     unwrapContainer = new UnwrapRowContainer[conf.getTagLength()];
-    super.initializeOp(hconf);
+
+    Collection<Future<?>> result = super.initializeOp(hconf);
+    if (result == null) {
+      result = new HashSet<Future<?>>();
+    }
 
     int tagLen = conf.getTagLength();
 
@@ -113,10 +141,15 @@ public class MapJoinOperator extends Abs
     mapJoinTables = new MapJoinTableContainer[tagLen];
     mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
     hashTblInitedOnce = false;
+    useHybridGraceHashJoin =
+        HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN);
 
     generateMapMetaData();
 
-    if (!conf.isBucketMapJoin()) {
+    final ExecMapperContext mapContext = getExecContext();
+    final MapredContext mrContext = MapredContext.get();
+
+    if (!conf.isBucketMapJoin() && !useHybridGraceHashJoin) {
       /*
        * The issue with caching in case of bucket map join is that different tasks
        * process different buckets and if the container is reused to join a different bucket,
@@ -126,27 +159,42 @@ public class MapJoinOperator extends Abs
        * also ability to schedule tasks to re-use containers that have cached the specific bucket.
        */
       if (isLogInfoEnabled) {
-	LOG.info("This is not bucket map join, so cache");
+        LOG.info("This is not bucket map join, so cache");
       }
 
-      Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]> pair =
-	(Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>)
-	cache.retrieve(cacheKey, new Callable<Object>() {
-	  public Object call() throws HiveException {
-	    return loadHashTable();
-	  }
-	});
+      Future<Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>> future =
+          cache.retrieveAsync(
+              cacheKey,
+              new Callable<Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>>() {
+                @Override
+                public Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]> call()
+                    throws HiveException {
+                  return loadHashTable(mapContext, mrContext);
+                }
+              });
+      result.add(future);
+    } else if (mapContext == null || mapContext.getLocalWork() == null
+        || mapContext.getLocalWork().getInputFileChangeSensitive() == false) {
+      loadHashTable(mapContext, mrContext);
+      hashTblInitedOnce = true;
+    }
+    return result;
+  }
 
+  @SuppressWarnings("unchecked")
+  @Override
+  protected final void completeInitializationOp(Object[] os) throws HiveException {
+    if (os.length != 0) {
+      Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]> pair =
+          (Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>) os[0];
       mapJoinTables = pair.getLeft();
       mapJoinTableSerdes = pair.getRight();
       hashTblInitedOnce = true;
-    } else {
-      loadHashTable();
     }
 
     if (this.getExecContext() != null) {
       // reset exec context so that initialization of the map operator happens
-      // poperly
+      // properly
       this.getExecContext().setLastInputPath(null);
       this.getExecContext().setCurrentInputPath(null);
     }
@@ -182,45 +230,44 @@ public class MapJoinOperator extends Abs
     try {
       TableDesc keyTableDesc = conf.getKeyTblDesc();
       SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
-	  null);
+ null);
       SerDeUtils.initializeSerDe(keySerializer, null, keyTableDesc.getProperties(), null);
       MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false);
       for (int pos = 0; pos < order.length; pos++) {
-	if (pos == posBigTable) {
-	  continue;
-	}
-	TableDesc valueTableDesc;
-	if (conf.getNoOuterJoin()) {
-	  valueTableDesc = conf.getValueTblDescs().get(pos);
-	} else {
-	  valueTableDesc = conf.getValueFilteredTblDescs().get(pos);
-	}
-	SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
-	    null);
-	SerDeUtils.initializeSerDe(valueSerDe, null, valueTableDesc.getProperties(), null);
-	MapJoinObjectSerDeContext valueContext = new MapJoinObjectSerDeContext(valueSerDe, hasFilter(pos));
-	mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, valueContext);
+        if (pos == posBigTable) {
+          continue;
+        }
+        TableDesc valueTableDesc;
+        if (conf.getNoOuterJoin()) {
+          valueTableDesc = conf.getValueTblDescs().get(pos);
+        } else {
+          valueTableDesc = conf.getValueFilteredTblDescs().get(pos);
+        }
+        SerDe valueSerDe =
+            (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueSerDe, null, valueTableDesc.getProperties(), null);
+        MapJoinObjectSerDeContext valueContext =
+            new MapJoinObjectSerDeContext(valueSerDe, hasFilter(pos));
+        mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, valueContext);
       }
     } catch (SerDeException e) {
       throw new HiveException(e);
     }
   }
 
-  private Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>
-    loadHashTable() throws HiveException {
+  private Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]> loadHashTable(
+      ExecMapperContext mapContext, MapredContext mrContext) throws HiveException {
 
     if (this.hashTblInitedOnce
-	&& ((this.getExecContext() == null)
-	    || (this.getExecContext().getLocalWork() == null)
-	    || (this.getExecContext().getLocalWork().getInputFileChangeSensitive()
-		== false))) {
+        && ((mapContext == null) || (mapContext.getLocalWork() == null) || (mapContext
+            .getLocalWork().getInputFileChangeSensitive() == false))) {
       // no need to reload
-      return new ImmutablePair<MapJoinTableContainer[],
-	MapJoinTableContainerSerDe[]> (mapJoinTables, mapJoinTableSerdes);
+      return new ImmutablePair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>(
+          mapJoinTables, mapJoinTableSerdes);
     }
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
-    loader.init(getExecContext(), hconf, this);
+    loader.init(mapContext, mrContext, hconf, this);
     long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize()
         * conf.getHashTableMemoryUsage());
     loader.load(mapJoinTables, mapJoinTableSerdes, memUsage);
@@ -239,12 +286,12 @@ public class MapJoinOperator extends Abs
   // Load the hash table
   @Override
   public void cleanUpInputFileChangedOp() throws HiveException {
-    loadHashTable();
+    loadHashTable(getExecContext(), MapredContext.get());
   }
 
-  protected void setMapJoinKey(
+  protected JoinUtil.JoinResult setMapJoinKey(
       ReusableGetAdaptor dest, Object row, byte alias) throws HiveException {
-    dest.setFromRow(row, joinKeys[alias], joinKeysObjectInspectors[alias]);
+    return dest.setFromRow(row, joinKeys[alias], joinKeysObjectInspectors[alias]);
   }
 
   protected MapJoinKey getRefKey(byte alias) {
@@ -260,7 +307,19 @@ public class MapJoinOperator extends Abs
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
+    this.tag = tag;
+
+    // As we're calling processOp again to process the leftover triplets, we know the "row" is
+    // coming from the on-disk matchfile. We need to recreate hashMapRowGetter against new hashtable
+    if (hybridMapJoinLeftover) {
+      assert hashMapRowGetters != null;
+      if (hashMapRowGetters[smallTable] == null) {
+        MapJoinKey refKey = getRefKey((byte) tag);
+        hashMapRowGetters[smallTable] = currentSmallTable.createGetter(refKey);
+      }
+    }
+
     try {
       alias = (byte) tag;
       if (hashMapRowGetters == null) {
@@ -279,14 +338,16 @@ public class MapJoinOperator extends Abs
       boolean joinNeeded = false;
       for (byte pos = 0; pos < order.length; pos++) {
         if (pos != alias) {
+          smallTable = pos; // record small table alias
+          JoinUtil.JoinResult joinResult;
           ReusableGetAdaptor adaptor;
           if (firstSetKey == null) {
             adaptor = firstSetKey = hashMapRowGetters[pos];
-            setMapJoinKey(firstSetKey, row, alias);
+            joinResult = setMapJoinKey(firstSetKey, row, alias);
           } else {
             // Keys for all tables are the same, so only the first has to deserialize them.
             adaptor = hashMapRowGetters[pos];
-            adaptor.setFromOther(firstSetKey);
+            joinResult = adaptor.setFromOther(firstSetKey);
           }
           MapJoinRowContainer rowContainer = adaptor.getCurrentRows();
           if (rowContainer != null && unwrapContainer[pos] != null) {
@@ -296,8 +357,13 @@ public class MapJoinOperator extends Abs
           // there is no join-value or join-key has all null elements
           if (rowContainer == null || firstSetKey.hasAnyNulls(fieldCount, nullsafes)) {
             if (!noOuterJoin) {
-              joinNeeded = true;
-              storage[pos] = dummyObjVectors[pos];
+              // For Hybrid Grace Hash Join, during the 1st round processing,
+              // we only keep the LEFT side if the row is not spilled
+              if (!useHybridGraceHashJoin || hybridMapJoinLeftover ||
+                  (!hybridMapJoinLeftover && joinResult != JoinUtil.JoinResult.SPILL)) {
+                joinNeeded = true;
+                storage[pos] = dummyObjVectors[pos];
+              }
             } else {
               storage[pos] = emptyList;
             }
@@ -306,6 +372,10 @@ public class MapJoinOperator extends Abs
             storage[pos] = rowContainer.copy();
             aliasFilterTags[pos] = rowContainer.getAliasFilter();
           }
+          // Spill the big table rows into appropriate partition
+          if (joinResult == JoinUtil.JoinResult.SPILL) {
+            spillBigTableRow(mapJoinTables[pos], row);
+          }
         }
       }
       if (joinNeeded) {
@@ -329,11 +399,60 @@ public class MapJoinOperator extends Abs
     }
   }
 
+  /**
+   * Postpone processing the big table row temporarily by spilling it to a row container
+   * @param hybridHtContainer Hybrid hashtable container
+   * @param row big table row
+   */
+  private void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row) {
+    HybridHashTableContainer ht = (HybridHashTableContainer) hybridHtContainer;
+    int partitionId = ht.getToSpillPartitionId();
+    HashPartition hp = ht.getHashPartitions()[partitionId];
+    ObjectContainer bigTable = hp.getMatchfileObjContainer();
+    bigTable.add(row);
+  }
+
   @Override
   public void closeOp(boolean abort) throws HiveException {
     for (MapJoinTableContainer tableContainer : mapJoinTables) {
       if (tableContainer != null) {
         tableContainer.dumpMetrics();
+
+        if (tableContainer instanceof HybridHashTableContainer) {
+          HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer;
+          hybridHtContainer.dumpStats();
+
+          HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions();
+          // Clear all in memory partitions first
+          for (int i = 0; i < hashPartitions.length; i++) {
+            if (!hashPartitions[i].isHashMapOnDisk()) {
+              hybridHtContainer.setTotalInMemRowCount(
+                  hybridHtContainer.getTotalInMemRowCount() -
+                      hashPartitions[i].getHashMapFromMemory().getNumValues());
+              hashPartitions[i].getHashMapFromMemory().clear();
+            }
+          }
+          assert hybridHtContainer.getTotalInMemRowCount() == 0;
+
+          for (int i = 0; i < hashPartitions.length; i++) {
+            if (hashPartitions[i].isHashMapOnDisk()) {
+              // Recursively process on-disk triplets (hash partition, sidefile, matchfile)
+              try {
+                hybridMapJoinLeftover = true;
+                hashMapRowGetters[smallTable] = null;
+                continueProcess(hashPartitions[i], hybridHtContainer);
+              } catch (IOException e) {
+                e.printStackTrace();
+              } catch (ClassNotFoundException e) {
+                e.printStackTrace();
+              } catch (SerDeException e) {
+                e.printStackTrace();
+              }
+            }
+            hybridMapJoinLeftover = false;
+            currentSmallTable = null;
+          }
+        }
       }
     }
     if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null)
@@ -350,6 +469,84 @@ public class MapJoinOperator extends Abs
   }
 
   /**
+   * Continue processing each pair of spilled hashtable and big table row container,
+   * by bringing them back to memory and calling process() again.
+   * @param partition hash partition to process
+   * @param hybridHtContainer Hybrid hashtable container
+   * @throws HiveException
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws SerDeException
+   */
+  private void continueProcess(HashPartition partition, HybridHashTableContainer hybridHtContainer)
+      throws HiveException, IOException, ClassNotFoundException, SerDeException {
+    reloadHashTable(partition, hybridHtContainer);
+    // Iterate thru the on-disk matchfile, and feed processOp with leftover rows
+    ObjectContainer bigTable = partition.getMatchfileObjContainer();
+    while (bigTable.hasNext()) {
+      Object row = bigTable.next();
+      process(row, tag);
+    }
+    bigTable.clear();
+  }
+
+  /**
+   * Reload hashtable from the hash partition.
+   * It can have two steps:
+   * 1) Deserialize a serialized hash table, and
+   * 2) Merge every key/value pair from small table container into the hash table
+   * @param partition hash partition to process
+   * @param hybridHtContainer Hybrid hashtable container
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws HiveException
+   * @throws SerDeException
+   */
+  private void reloadHashTable(HashPartition partition,
+                               HybridHashTableContainer hybridHtContainer)
+      throws IOException, ClassNotFoundException, HiveException, SerDeException {
+    // Deserialize the on-disk hash table
+    // We're sure this part is smaller than memory limit
+    BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk();
+    int rowCount = restoredHashMap.getNumValues();
+    LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition...");
+    LOG.info("Hybrid Grace Hash Join: Number of rows restored from hashmap: " + rowCount);
+
+    // Merge the sidefile into the newly created hash table
+    // This is where the spilling may happen again
+    KeyValueContainer kvContainer = partition.getSidefileKVContainer();
+    rowCount += kvContainer.size();
+    LOG.info("Hybrid Grace Hash Join: Number of rows restored from KeyValueContainer: " +
+        kvContainer.size());
+
+    // If based on the new key count, keyCount is smaller than a threshold,
+    // then just load the entire restored hashmap into memory.
+    // The size of deserialized partition shouldn't exceed half of memory limit
+    if (rowCount * hybridHtContainer.getTableRowSize() >= hybridHtContainer.getMemoryThreshold() / 2) {
+      throw new RuntimeException("Hybrid Grace Hash Join: Hash table cannot be reloaded since it" +
+          " will be greater than memory limit. Recursive spilling is currently not supported");
+    }
+
+    KeyValueHelper writeHelper = hybridHtContainer.getWriteHelper();
+    while (kvContainer.hasNext()) {
+      ObjectPair<HiveKey, BytesWritable> pair = kvContainer.next();
+      Writable key = pair.getFirst();
+      Writable val = pair.getSecond();
+      writeHelper.setKeyValue(key, val);
+      restoredHashMap.put(writeHelper, -1);
+    }
+
+    hybridHtContainer.setTotalInMemRowCount(hybridHtContainer.getTotalInMemRowCount()
+        + restoredHashMap.getNumValues() + kvContainer.size());
+    kvContainer.clear();
+
+    // Since there's only one hashmap to deal with, it's OK to create a MapJoinBytesTableContainer
+    currentSmallTable = new MapJoinBytesTableContainer(restoredHashMap);
+    currentSmallTable.setInternalValueOi(hybridHtContainer.getInternalValueOi());
+    currentSmallTable.setSortableSortOrders(hybridHtContainer.getSortableSortOrders());
+  }
+
+  /**
    * Implements the getName function for the Node Interface.
    *
    * @return the name of the operator

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Mon Mar 23 22:02:13 2015
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -29,8 +30,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Future;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -62,6 +63,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Map operator. This triggers overall map side processing. This is a little
  * different from regular operators in that it starts off by processing a
@@ -154,7 +157,7 @@ public class MapOperator extends Operato
       if (op.getDone()) {
         return false;
       }
-      op.processOp(row, 0);
+      op.process(row, 0);
       return true;
     }
   }
@@ -172,8 +175,8 @@ public class MapOperator extends Operato
   void initializeAsRoot(JobConf hconf, MapWork mapWork) throws Exception {
     setConf(mapWork);
     setChildren(hconf);
-    setExecContext(new ExecMapperContext(hconf));
-    initialize(hconf, null);
+    passExecContext(new ExecMapperContext(hconf));
+    initializeMapOperator(hconf);
   }
 
   private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx opCtx,
@@ -413,7 +416,11 @@ public class MapOperator extends Operato
   }
 
   @Override
-  public void initializeOp(Configuration hconf) throws HiveException {
+  public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    return super.initializeOp(hconf);
+  }
+
+  public void initializeMapOperator(Configuration hconf) throws HiveException {
     // set that parent initialization is done and call initialize on children
     state = State.INIT;
     statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
@@ -604,7 +611,7 @@ public class MapOperator extends Operato
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     throw new HiveException("Hive 2 Internal error: should not be called!");
   }
 

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java Mon Mar 23 22:02:13 2015
@@ -52,6 +52,7 @@ public class MapredContext {
         HiveConf.getVar(jobConf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ?
             new TezContext(isMap, jobConf) : new MapredContext(isMap, jobConf);
     contexts.set(context);
+    logger.info("MapredContext initialized.");
     return context;
   }
 

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java Mon Mar 23 22:02:13 2015
@@ -21,7 +21,9 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -169,7 +171,9 @@ public class MuxOperator extends Operato
   private transient long[] nextCntrs;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
+
     // A MuxOperator should only have a single child
     if (childOperatorsArray.length != 1) {
       throw new HiveException(
@@ -204,7 +208,7 @@ public class MuxOperator extends Operato
       cntrs[i] = 0;
       nextCntrs[i] = 1;
     }
-    initializeChildren(hconf);
+    return result;
   }
 
   /**
@@ -230,7 +234,7 @@ public class MuxOperator extends Operato
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (isLogInfoEnabled) {
       cntrs[tag]++;
       if (cntrs[tag] == nextCntrs[tag]) {
@@ -247,11 +251,11 @@ public class MuxOperator extends Operato
       } else {
         if (forward[tag]) {
           // No need to evaluate, just forward it.
-          child.processOp(row, tag);
+          child.process(row, tag);
         } else {
           // Call the corresponding handler to evaluate this row and
           // forward the result
-          child.processOp(handlers[tag].process(row), handlers[tag].getTag());
+          child.process(handlers[tag].process(row), handlers[tag].getTag());
         }
       }
     }
@@ -269,7 +273,7 @@ public class MuxOperator extends Operato
     // we cannot pass new tag to this method which is used to get
     // the old tag from the mapping of newTagToOldTag, we bypass
     // this method in MuxOperator and directly call process on children
-    // in processOp() method..
+    // in process() method..
   }
 
   @Override
@@ -308,7 +312,7 @@ public class MuxOperator extends Operato
   protected void closeOp(boolean abort) throws HiveException {
     if (isLogInfoEnabled) {
       for (int i = 0; i < numParents; i++) {
-	LOG.info(id + ", tag=" + i + ", forwarded " + cntrs[i] + " rows");
+        LOG.info(id + ", tag=" + i + ", forwarded " + cntrs[i] + " rows");
       }
     }
   }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java Mon Mar 23 22:02:13 2015
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 
 /**
@@ -32,9 +34,23 @@ public interface ObjectCache {
 
   /**
    * Retrieve object from cache.
+   *
+   * @param <T>
+   * @param key
+   * @param fn
+   *          function to generate the object if it's not there
+   * @return the last cached object with the key, null if none.
+   */
+  public <T> T retrieve(String key, Callable<T> fn) throws HiveException;
+
+  /**
+   * Retrieve object from cache asynchronously.
+   *
+   * @param <T>
    * @param key
-   * @param fn function to generate the object if it's not there
+   * @param fn
+   *          function to generate the object if it's not there
    * @return the last cached object with the key, null if none.
    */
-  public Object retrieve(String key, Callable<?> fn) throws HiveException;
+  public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws HiveException;
 }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Mon Mar 23 22:02:13 2015
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -68,6 +69,7 @@ public abstract class Operator<T extends
   protected List<Operator<? extends OperatorDesc>> parentOperators;
   protected String operatorId;
   private transient ExecMapperContext execContext;
+  private transient boolean rootInitializeCalled = false;
 
   private static AtomicInteger seqId;
 
@@ -101,13 +103,13 @@ public abstract class Operator<T extends
   // dummy operator (for not increasing seqId)
   private Operator(String name) {
     id = name;
+    initOperatorId();
+    childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+    parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
   }
 
   public Operator() {
-    id = String.valueOf(seqId.getAndIncrement());
-    childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
-    parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
-    initOperatorId();
+    this(String.valueOf(seqId.getAndIncrement()));
   }
 
   public static void resetId() {
@@ -252,11 +254,6 @@ public abstract class Operator<T extends
   public void setReporter(Reporter rep) {
     reporter = rep;
 
-    // the collector is same across all operators
-    if (childOperators == null) {
-      return;
-    }
-
     for (Operator<? extends OperatorDesc> op : childOperators) {
       op.setReporter(rep);
     }
@@ -266,11 +263,6 @@ public abstract class Operator<T extends
   public void setOutputCollector(OutputCollector out) {
     this.out = out;
 
-    // the collector is same across all operators
-    if (childOperators == null) {
-      return;
-    }
-
     for (Operator<? extends OperatorDesc> op : childOperators) {
       op.setOutputCollector(out);
     }
@@ -282,10 +274,6 @@ public abstract class Operator<T extends
   public void setAlias(String alias) {
     this.alias = alias;
 
-    if (childOperators == null) {
-      return;
-    }
-
     for (Operator<? extends OperatorDesc> op : childOperators) {
       op.setAlias(alias);
     }
@@ -306,9 +294,6 @@ public abstract class Operator<T extends
    *         otherwise
    */
   protected boolean areAllParentsInitialized() {
-    if (parentOperators == null) {
-      return true;
-    }
     for (Operator<? extends OperatorDesc> parent : parentOperators) {
       if (parent == null) {
         //return true;
@@ -332,7 +317,7 @@ public abstract class Operator<T extends
    * @throws HiveException
    */
   @SuppressWarnings("unchecked")
-  public void initialize(Configuration hconf, ObjectInspector[] inputOIs)
+  public final void initialize(Configuration hconf, ObjectInspector[] inputOIs)
       throws HiveException {
     if (state == State.INIT) {
       return;
@@ -344,7 +329,7 @@ public abstract class Operator<T extends
     }
 
     if (isLogInfoEnabled) {
-      LOG.info("Initializing Self " + this);
+      LOG.info("Initializing operator " + this);
     }
 
     if (inputOIs != null) {
@@ -352,50 +337,69 @@ public abstract class Operator<T extends
     }
 
     // initialize structure to maintain child op info. operator tree changes
-    // while
-    // initializing so this need to be done here instead of initialize() method
-    if (childOperators != null && !childOperators.isEmpty()) {
-      childOperatorsArray = new Operator[childOperators.size()];
-      for (int i = 0; i < childOperatorsArray.length; i++) {
-        childOperatorsArray[i] = childOperators.get(i);
-      }
-      childOperatorsTag = new int[childOperatorsArray.length];
-      for (int i = 0; i < childOperatorsArray.length; i++) {
-        List<Operator<? extends OperatorDesc>> parentOperators = childOperatorsArray[i]
-            .getParentOperators();
-        if (parentOperators == null) {
-          throw new HiveException("Hive internal error: parent is null in "
-              + childOperatorsArray[i].getClass() + "!");
-        }
-        childOperatorsTag[i] = parentOperators.indexOf(this);
-        if (childOperatorsTag[i] == -1) {
-          throw new HiveException(
-              "Hive internal error: cannot find parent in the child operator!");
-        }
+    // while initializing so this need to be done here instead of constructor
+    childOperatorsArray = new Operator[childOperators.size()];
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      childOperatorsArray[i] = childOperators.get(i);
+    }
+    childOperatorsTag = new int[childOperatorsArray.length];
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      List<Operator<? extends OperatorDesc>> parentOperators =
+          childOperatorsArray[i].getParentOperators();
+      childOperatorsTag[i] = parentOperators.indexOf(this);
+      if (childOperatorsTag[i] == -1) {
+        throw new HiveException("Hive internal error: cannot find parent in the child operator!");
       }
     }
 
     if (inputObjInspectors.length == 0) {
       throw new HiveException("Internal Error during operator initialization.");
     }
+
     // derived classes can set this to different object if needed
     outputObjInspector = inputObjInspectors[0];
 
-    //pass the exec context to child operators
-    passExecContext(this.execContext);
+    Collection<Future<?>> asyncInitOperations = initializeOp(hconf);
 
-    initializeOp(hconf);
-
-    // sanity check
-    if (childOperatorsArray == null
-        && !(childOperators == null || childOperators.isEmpty())) {
-      throw new HiveException(
-          "Internal Hive error during operator initialization.");
+    // sanity checks
+    if (!rootInitializeCalled
+	|| asyncInitOperations == null
+	|| childOperatorsArray.length != childOperators.size()) {
+      throw new AssertionError("Internal error during operator initialization");
     }
 
     if (isLogInfoEnabled) {
       LOG.info("Initialization Done " + id + " " + getName());
     }
+
+    initializeChildren(hconf);
+
+    // let's wait on the async ops before continuing
+    completeInitialization(asyncInitOperations);
+  }
+
+  private void completeInitialization(Collection<Future<?>> fs) throws HiveException {
+    Object[] os = new Object[fs.size()];
+    int i = 0;
+    for (Future<?> f : fs) {
+      try {
+        os[i++] = f.get();
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+    }
+    completeInitializationOp(os);
+  }
+
+  /**
+   * This metod can be used to retrieve the results from async operations
+   * started at init time - before the operator pipeline is started.
+   *
+   * @param os
+   * @throws HiveException
+   */
+  protected void completeInitializationOp(Object[] os) throws HiveException {
+    // no-op default
   }
 
   public void initializeLocalWork(Configuration hconf) throws HiveException {
@@ -410,8 +414,9 @@ public abstract class Operator<T extends
   /**
    * Operator specific initialization.
    */
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    initializeChildren(hconf);
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    rootInitializeCalled = true;
+    return new ArrayList<Future<?>>();
   }
 
   /**
@@ -430,8 +435,7 @@ public abstract class Operator<T extends
       LOG.info("Initializing children of " + id + " " + getName());
     }
     for (int i = 0; i < childOperatorsArray.length; i++) {
-      childOperatorsArray[i].initialize(hconf, outputObjInspector,
-          childOperatorsTag[i]);
+      childOperatorsArray[i].initialize(hconf, outputObjInspector, childOperatorsTag[i]);
       if (reporter != null) {
         childOperatorsArray[i].setReporter(reporter);
       }
@@ -443,10 +447,8 @@ public abstract class Operator<T extends
    */
   public void passExecContext(ExecMapperContext execContext) {
     this.setExecContext(execContext);
-    if(childOperators != null) {
-      for (int i = 0; i < childOperators.size(); i++) {
+    for (int i = 0; i < childOperators.size(); i++) {
         childOperators.get(i).passExecContext(execContext);
-      }
     }
   }
 
@@ -501,7 +503,7 @@ public abstract class Operator<T extends
    *          Rows with the same tag should have exactly the same rowInspector
    *          all the time.
    */
-  public abstract void processOp(Object row, int tag) throws HiveException;
+  public abstract void process(Object row, int tag) throws HiveException;
 
   protected final void defaultStartGroup() throws HiveException {
     if (isLogDebugEnabled) {
@@ -598,7 +600,7 @@ public abstract class Operator<T extends
     // check if all parents are finished
     if (!allInitializedParentsAreClosed()) {
       if (isLogDebugEnabled) {
-	LOG.debug("Not all parent operators are closed. Not closing.");
+        LOG.debug("Not all parent operators are closed. Not closing.");
       }
       return;
     }
@@ -822,7 +824,7 @@ public abstract class Operator<T extends
   protected void forward(Object row, ObjectInspector rowInspector)
       throws HiveException {
 
-    if ((childOperatorsArray == null) || (getDone())) {
+    if (getDone()) {
       return;
     }
 
@@ -832,12 +834,12 @@ public abstract class Operator<T extends
       if (o.getDone()) {
         childrenDone++;
       } else {
-        o.processOp(row, childOperatorsTag[i]);
+        o.process(row, childOperatorsTag[i]);
       }
     }
 
     // if all children are done, this operator is also done
-    if (childrenDone == childOperatorsArray.length) {
+    if (childrenDone != 0 && childrenDone == childOperatorsArray.length) {
       setDone(true);
     }
   }
@@ -878,7 +880,7 @@ public abstract class Operator<T extends
   public void logStats() {
     if (isLogInfoEnabled) {
       for (String e : statsMap.keySet()) {
-	LOG.info(e.toString() + ":" + statsMap.get(e).toString());
+        LOG.info(e.toString() + ":" + statsMap.get(e).toString());
       }
     }
   }
@@ -969,7 +971,7 @@ public abstract class Operator<T extends
    * Initialize an array of ExprNodeEvaluator and return the result
    * ObjectInspectors.
    */
-  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals,
+  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator<?>[] evals,
       ObjectInspector rowInspector) throws HiveException {
     ObjectInspector[] result = new ObjectInspector[evals.length];
     for (int i = 0; i < evals.length; i++) {
@@ -982,7 +984,7 @@ public abstract class Operator<T extends
    * Initialize an array of ExprNodeEvaluator from start, for specified length
    * and return the result ObjectInspectors.
    */
-  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals,
+  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator<?>[] evals,
       int start, int length,
       ObjectInspector rowInspector) throws HiveException {
     ObjectInspector[] result = new ObjectInspector[length];
@@ -997,7 +999,7 @@ public abstract class Operator<T extends
    * StructObjectInspector with integer field names.
    */
   protected static StructObjectInspector initEvaluatorsAndReturnStruct(
-      ExprNodeEvaluator[] evals, List<String> outputColName,
+      ExprNodeEvaluator<?>[] evals, List<String> outputColName,
       ObjectInspector rowInspector) throws HiveException {
     ObjectInspector[] fieldObjectInspectors = initEvaluators(evals,
         rowInspector);
@@ -1059,12 +1061,6 @@ public abstract class Operator<T extends
 
   public void setExecContext(ExecMapperContext execContext) {
     this.execContext = execContext;
-    if(this.childOperators != null) {
-      for (int i = 0; i<this.childOperators.size();i++) {
-        Operator<? extends OperatorDesc> op = this.childOperators.get(i);
-        op.setExecContext(execContext);
-      }
-    }
   }
 
   // The input file has changed - every operator can invoke specific action
@@ -1128,6 +1124,7 @@ public abstract class Operator<T extends
    * @return Cloned operator
    * @throws CloneNotSupportedException
    */
+  @SuppressWarnings("unchecked")
   public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
     T descClone = (T) conf.clone();
     Operator<? extends OperatorDesc> ret =
@@ -1148,11 +1145,6 @@ public abstract class Operator<T extends
       throws CloneNotSupportedException {
     Operator<? extends OperatorDesc> newOp = this.cloneOp();
     newOp.setParentOperators(this.parentOperators);
-    // Fix parent in all children
-    if (this.getChildOperators() == null) {
-      newOp.setChildOperators(null);
-      return newOp;
-    }
     List<Operator<? extends OperatorDesc>> newChildren =
         new ArrayList<Operator<? extends OperatorDesc>>();
 
@@ -1301,12 +1293,13 @@ public abstract class Operator<T extends
     if (conf != null) {
       return conf.getStatistics();
     }
+
     return null;
   }
 
   public OpTraits getOpTraits() {
     if (conf != null) {
-      return conf.getOpTraits();
+      return conf.getTraits();
     }
 
     return null;
@@ -1314,36 +1307,48 @@ public abstract class Operator<T extends
 
   public void setOpTraits(OpTraits metaInfo) {
     if (isLogDebugEnabled) {
-      LOG.debug("Setting traits ("+metaInfo+") on "+this);
+      LOG.debug("Setting traits (" + metaInfo + ") on " + this);
     }
     if (conf != null) {
-      conf.setOpTraits(metaInfo);
+      conf.setTraits(metaInfo);
     } else {
-      LOG.warn("Cannot set traits when there's no descriptor: "+this);
+      LOG.warn("Cannot set traits when there's no descriptor: " + this);
     }
   }
 
   public void setStatistics(Statistics stats) {
     if (isLogDebugEnabled) {
-      LOG.debug("Setting stats ("+stats+") on "+this);
+      LOG.debug("Setting stats (" + stats + ") on " + this);
     }
     if (conf != null) {
       conf.setStatistics(stats);
     } else {
-      LOG.warn("Cannot set stats when there's no descriptor: "+this);
+      LOG.warn("Cannot set stats when there's no descriptor: " + this);
     }
   }
 
+  @SuppressWarnings("rawtypes")
   public static Operator createDummy() {
     return new DummyOperator();
   }
 
+  @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
   private static class DummyOperator extends Operator {
     public DummyOperator() { super("dummy"); }
+
     @Override
-    public void processOp(Object row, int tag) { }
+    public void process(Object row, int tag) {
+    }
+
     @Override
-    public OperatorType getType() { return null; }
+    public OperatorType getType() {
+      return null;
+    }
+
+    @Override
+    protected Collection<Future<?>> initializeOp(Configuration conf) {
+      return childOperators;
+    }
   }
 
   public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Mon Mar 23 22:02:13 2015
@@ -18,6 +18,12 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
@@ -62,16 +68,13 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
 import org.apache.hadoop.hive.ql.plan.UnionDesc;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * OperatorFactory.
  *
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public final class OperatorFactory {
+  protected static transient final Log LOG = LogFactory.getLog(OperatorFactory.class);
   private static final List<OpTuple> opvec;
   private static final List<OpTuple> vectorOpvec;
 
@@ -227,9 +230,6 @@ public final class OperatorFactory {
     // Add this parent to the children
     for (Operator<? extends OperatorDesc> op : oplist) {
       List<Operator<? extends OperatorDesc>> parents = op.getParentOperators();
-      if (parents == null) {
-        parents = new ArrayList<Operator<? extends OperatorDesc>>();
-      }
       parents.add(ret);
       op.setParentOperators(parents);
     }
@@ -259,9 +259,6 @@ public final class OperatorFactory {
     // Add the new operator as child of each of the passed in operators
     for (Operator op : oplist) {
       List<Operator> children = op.getChildOperators();
-      if (children == null) {
-        children = new ArrayList<Operator>();
-      }
       children.add(ret);
       op.setChildOperators(children);
     }
@@ -286,17 +283,13 @@ public final class OperatorFactory {
     Operator<T> ret = get((Class<T>) conf.getClass());
     ret.setConf(conf);
     if (oplist.size() == 0) {
-      return (ret);
+      return ret;
     }
 
     // Add the new operator as child of each of the passed in operators
     for (Operator op : oplist) {
       List<Operator> children = op.getChildOperators();
-      if (children == null) {
-        children = new ArrayList<Operator>();
-      }
       children.add(ret);
-      op.setChildOperators(children);
     }
 
     // add parents for the newly created operator
@@ -308,7 +301,7 @@ public final class OperatorFactory {
 
     ret.setParentOperators(parent);
 
-    return (ret);
+    return ret;
   }
 
   /**
@@ -318,7 +311,7 @@ public final class OperatorFactory {
       RowSchema rwsch, Operator... oplist) {
     Operator<T> ret = getAndMakeChild(conf, oplist);
     ret.setSchema(rwsch);
-    return (ret);
+    return ret;
   }
 
   /**

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Mon Mar 23 22:02:13 2015
@@ -122,7 +122,7 @@ public class OperatorUtils {
       if(op.getName().equals(ReduceSinkOperator.getOperatorName())) {
         ReduceSinkOperator rs = ((ReduceSinkOperator)op);
         if (outMap.containsKey(rs.getConf().getOutputName())) {
-          LOG.info("Setting output collector: " + rs + " --> " 
+          LOG.info("Setting output collector: " + rs + " --> "
             + rs.getConf().getOutputName());
           rs.setOutputCollector(outMap.get(rs.getConf().getOutputName()));
         }
@@ -234,9 +234,7 @@ public class OperatorUtils {
             resultMap.put(clazz, op);
           }
         }
-        if (op.getChildOperators() != null) {
-          allChildren.addAll(op.getChildOperators());
-        }
+        allChildren.addAll(op.getChildOperators());
       }
       ops = allChildren;
     }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java Mon Mar 23 22:02:13 2015
@@ -56,7 +56,7 @@ public class OrcFileMergeOperator extend
   private FSDataInputStream fdis;
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     Object[] keyValue = (Object[]) row;
     processKeyValuePairs(keyValue[0], keyValue[1]);
   }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Mon Mar 23 22:02:13 2015
@@ -19,12 +19,13 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Stack;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -66,7 +67,8 @@ public class PTFOperator extends Operato
    * 4. Create input partition to store rows coming from previous operator
    */
   @Override
-  protected void initializeOp(Configuration jobConf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration jobConf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(jobConf);
     hiveConf = jobConf;
     isMapOperator = conf.isMapSide();
 
@@ -84,8 +86,7 @@ public class PTFOperator extends Operato
     ptfInvocation = setupChain();
     ptfInvocation.initializeStreaming(jobConf, isMapOperator);
     firstMapRow = true;
-
-    super.initializeOp(jobConf);
+    return result;
   }
 
   @Override
@@ -96,7 +97,7 @@ public class PTFOperator extends Operato
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (!isMapOperator ) {
       /*
        * checkif current row belongs to the current accumulated Partition:

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java Mon Mar 23 22:02:13 2015
@@ -43,7 +43,7 @@ public class RCFileMergeOperator
   int columnNumber = 0;
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     Object[] keyValue = (Object[]) row;
     processKeyValuePairs(keyValue[0], keyValue[1]);
   }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Mon Mar 23 22:02:13 2015
@@ -18,12 +18,16 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -48,14 +52,12 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.hash.MurmurHash;
 
-import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
-
 /**
  * Reduce Sink Operator sends output to the reduce stage.
  **/
@@ -153,7 +155,8 @@ public class ReduceSinkOperator extends
   private final transient LongWritable recordCounter = new LongWritable();
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     try {
 
       numRows = 0;
@@ -237,12 +240,12 @@ public class ReduceSinkOperator extends
       useUniformHash = conf.getReducerTraits().contains(UNIFORM);
 
       firstRow = true;
-      initializeChildren(hconf);
     } catch (Exception e) {
       String msg = "Error initializing ReduceSinkOperator: " + e.getMessage();
       LOG.error(msg, e);
       throw new RuntimeException(e);
     }
+    return result;
   }
 
 
@@ -291,7 +294,7 @@ public class ReduceSinkOperator extends
 
   @Override
   @SuppressWarnings("unchecked")
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     try {
       ObjectInspector rowInspector = inputObjInspectors[tag];
       if (firstRow) {
@@ -514,6 +517,7 @@ public class ReduceSinkOperator extends
     return keyWritable;
   }
 
+  @Override
   public void collect(byte[] key, byte[] value, int hash) throws IOException {
     HiveKey keyWritable = new HiveKey(key, hash);
     BytesWritable valueWritable = new BytesWritable(value);
@@ -608,6 +612,7 @@ public class ReduceSinkOperator extends
     return inputAliases;
   }
 
+  @Override
   public void setOutputCollector(OutputCollector _out) {
     this.out = _out;
   }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Mon Mar 23 22:02:13 2015
@@ -21,10 +21,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -91,7 +93,7 @@ public class SMBMapJoinOperator extends
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
 
     // If there is a sort-merge join followed by a regular join, the SMBJoinOperator may not
     // get initialized at all. Consider the following query:
@@ -99,7 +101,7 @@ public class SMBMapJoinOperator extends
     // For the mapper processing C, The SMJ is not initialized, no need to close it either.
     initDone = true;
 
-    super.initializeOp(hconf);
+    Collection<Future<?>> result = super.initializeOp(hconf);
 
     closeCalled = false;
 
@@ -154,6 +156,7 @@ public class SMBMapJoinOperator extends
       }
       foundNextKeyGroup[pos] = false;
     }
+    return result;
   }
 
   @Override
@@ -195,7 +198,7 @@ public class SMBMapJoinOperator extends
       HiveInputFormat.pushFilters(jobClone, ts);
 
 
-      ts.setExecContext(getExecContext());
+      ts.passExecContext(getExecContext());
 
       FetchOperator fetchOp = new FetchOperator(fetchWork, jobClone);
       ts.initialize(jobClone, new ObjectInspector[]{fetchOp.getOutputObjectInspector()});
@@ -231,7 +234,7 @@ public class SMBMapJoinOperator extends
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
 
     if (tag == posBigTable) {
       if (inputFileChanged) {
@@ -555,7 +558,7 @@ public class SMBMapJoinOperator extends
         fetchDone[tag] = true;
         return;
       }
-      forwardOp.processOp(row.o, tag);
+      forwardOp.process(row.o, tag);
       // check if any operator had a fatal error or early exit during
       // execution
       if (forwardOp.getDone()) {
@@ -803,7 +806,7 @@ public class SMBMapJoinOperator extends
 
         // Pass the row though the operator tree. It is guaranteed that not more than 1 row can
         // be produced from a input row.
-        forwardOp.processOp(nextRow.o, 0);
+        forwardOp.process(nextRow.o, 0);
         nextRow = sinkOp.getResult();
 
         // It is possible that the row got absorbed in the operator tree.

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Mar 23 22:02:13 2015
@@ -27,12 +27,14 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -259,7 +261,8 @@ public class ScriptOperator extends Oper
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     firstRow = true;
 
     statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
@@ -280,11 +283,10 @@ public class ScriptOperator extends Oper
 
       outputObjInspector = scriptOutputDeserializer.getObjectInspector();
 
-      // initialize all children before starting the script
-      initializeChildren(hconf);
     } catch (Exception e) {
       throw new HiveException(ErrorMsg.SCRIPT_INIT_ERROR.getErrorCodedMsg(), e);
     }
+    return result;
   }
 
   boolean isBrokenPipeException(IOException e) {
@@ -321,7 +323,7 @@ public class ScriptOperator extends Oper
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     // initialize the user's process only when you receive the first row
     if (firstRow) {
       firstRow = false;
@@ -573,6 +575,7 @@ public class ScriptOperator extends Oper
       this.rowInspector = rowInspector;
     }
 
+    @Override
     public void processLine(Writable line) throws HiveException {
       try {
         row = scriptOutputDeserializer.deserialize(line);
@@ -583,6 +586,7 @@ public class ScriptOperator extends Oper
       forward(row, rowInspector);
     }
 
+    @Override
     public void close() {
     }
   }
@@ -651,6 +655,7 @@ public class ScriptOperator extends Oper
       }
     }
 
+    @Override
     public void processLine(Writable line) throws HiveException {
 
       String stringLine = line.toString();
@@ -693,6 +698,7 @@ public class ScriptOperator extends Oper
       bytesCopied += len;
     }
 
+    @Override
     public void close() {
     }
 

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Mon Mar 23 22:02:13 2015
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,12 +44,12 @@ public class SelectOperator extends Oper
   private transient boolean isSelectStarNoCompute = false;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     // Just forward the row as is
     if (conf.isSelStarNoCompute()) {
-      initializeChildren(hconf);
       isSelectStarNoCompute = true;
-      return;
+      return result;
     }
     List<ExprNodeDesc> colList = conf.getColList();
     eval = new ExprNodeEvaluator[colList.size()];
@@ -64,11 +66,11 @@ public class SelectOperator extends Oper
     }
     outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf.getOutputColumnNames(),
         inputObjInspectors[0]);
-    initializeChildren(hconf);
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (isSelectStarNoCompute) {
       forward(row, inputObjInspectors[tag]);
       return;

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Mon Mar 23 22:02:13 2015
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.Future;
 
 import org.apache.commons.io.FileExistsException;
 import org.apache.commons.logging.Log;
@@ -50,7 +52,7 @@ public class SparkHashTableSinkOperator
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   protected static final Log LOG = LogFactory.getLog(SparkHashTableSinkOperator.class.getName());
 
-  private HashTableSinkOperator htsOperator;
+  private final HashTableSinkOperator htsOperator;
 
   // The position of this table
   private byte tag;
@@ -60,18 +62,20 @@ public class SparkHashTableSinkOperator
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()];
     inputOIs[tag] = inputObjInspectors[0];
     conf.setTagOrder(new Byte[]{ tag });
     htsOperator.setConf(conf);
     htsOperator.initialize(hconf, inputOIs);
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     // Ignore the tag passed in, which should be 0, not what we want
-    htsOperator.processOp(row, this.tag);
+    htsOperator.process(row, this.tag);
   }
 
   @Override

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Mon Mar 23 22:02:13 2015
@@ -20,9 +20,11 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -84,7 +86,7 @@ public class TableScanOperator extends O
    * operator will be enhanced to read the table.
    **/
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     if (rowLimit >= 0 && currCount++ >= rowLimit) {
       setDone(true);
       return;
@@ -153,9 +155,9 @@ public class TableScanOperator extends O
           values.add(o == null ? defaultPartitionName : o.toString());
         }
         partitionSpecs = FileUtils.makePartName(conf.getPartColumns(), values);
-	if (isLogInfoEnabled) {
-	  LOG.info("Stats Gathering found a new partition spec = " + partitionSpecs);
-	}
+        if (isLogInfoEnabled) {
+          LOG.info("Stats Gathering found a new partition spec = " + partitionSpecs);
+        }
       }
       // find which column contains the raw data size (both partitioned and non partitioned
       int uSizeColumn = -1;
@@ -191,16 +193,17 @@ public class TableScanOperator extends O
   }
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    initializeChildren(hconf);
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     inputFileChanged = false;
 
     if (conf == null) {
-      return;
+      return result;
     }
+
     rowLimit = conf.getRowLimit();
     if (!conf.isGatherStats()) {
-      return;
+      return result;
     }
 
     this.hconf = hconf;
@@ -216,9 +219,9 @@ public class TableScanOperator extends O
     stats = new HashMap<String, Stat>();
     if (conf.getPartColumns() == null || conf.getPartColumns().size() == 0) {
       // NON PARTITIONED table
-      return;
+      return result;
     }
-
+    return result;
   }
 
   @Override
@@ -282,7 +285,7 @@ public class TableScanOperator extends O
     if (!statsPublisher.connect(jc)) {
       // just return, stats gathering should not block the main query.
       if (isLogInfoEnabled) {
-	LOG.info("StatsPublishing error: cannot connect to database.");
+        LOG.info("StatsPublishing error: cannot connect to database.");
       }
       if (isStatsReliable) {
         throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg());

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/TezDummyStoreOperator.java Mon Mar 23 22:02:13 2015
@@ -33,8 +33,8 @@ public class TezDummyStoreOperator exten
    * the records.
    */
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
-    super.processOp(row, tag);
+  public void process(Object row, int tag) throws HiveException {
+    super.process(row, tag);
     forward(result.o, outputObjInspector);
   }
 }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Mon Mar 23 22:02:13 2015
@@ -20,7 +20,9 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -57,7 +59,8 @@ public class UDTFOperator extends Operat
   transient AutoProgressor autoProgressor;
 
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
     genericUDTF = conf.getGenericUDTF();
     collector = new UDTFCollector(this);
 
@@ -90,13 +93,11 @@ public class UDTFOperator extends Operat
               hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS));
       autoProgressor.go();
     }
-
-    // Initialize the rest of the operator DAG
-    super.initializeOp(hconf);
+    return result;
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
+  public void process(Object row, int tag) throws HiveException {
     // The UDTF expects arguments in an object[]
     StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
     List<? extends StructField> fields = soi.getAllStructFieldRefs();

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Mon Mar 23 22:02:13 2015
@@ -20,7 +20,9 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -54,7 +56,8 @@ public class UnionOperator extends Opera
    * needsTransform[].
    */
   @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
+  protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException {
+    Collection<Future<?>> result = super.initializeOp(hconf);
 
     int parents = parentOperators.size();
     parentObjInspectors = new StructObjectInspector[parents];
@@ -116,11 +119,11 @@ public class UnionOperator extends Opera
             + "] from " + inputObjInspectors[p] + " to " + outputObjInspector);
       }
     }
-    initializeChildren(hconf);
+    return result;
   }
 
   @Override
-  public synchronized void processOp(Object row, int tag) throws HiveException {
+  public synchronized void process(Object row, int tag) throws HiveException {
 
     StructObjectInspector soi = parentObjInspectors[tag];
     List<? extends StructField> fields = parentFields[tag];

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Mon Mar 23 22:02:13 2015
@@ -24,9 +24,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -105,6 +105,7 @@ public class ExecMapper extends MapReduc
       }
       mo.setConf(mrwork);
       // initialize map operator
+      mo.initialize(job, null);
       mo.setChildren(job);
       l4j.info(mo.dump(0));
       // initialize map local work
@@ -113,9 +114,9 @@ public class ExecMapper extends MapReduc
 
       MapredContext.init(true, new JobConf(jc));
 
-      mo.setExecContext(execContext);
+      mo.passExecContext(execContext);
       mo.initializeLocalWork(jc);
-      mo.initialize(jc, null);
+      mo.initializeMapOperator(jc);
 
       if (localWork == null) {
         return;
@@ -126,7 +127,7 @@ public class ExecMapper extends MapReduc
       l4j.info("Initializing dummy operator");
       List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
       for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
-        dummyOp.setExecContext(execContext);
+        dummyOp.passExecContext(execContext);
         dummyOp.initialize(jc,null);
       }
     } catch (Throwable e) {

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Mon Mar 23 22:02:13 2015
@@ -232,7 +232,7 @@ public class ExecReducer extends MapRedu
         row.add(valueObject[tag]);
 
         try {
-          reducer.processOp(row, tag);
+          reducer.process(row, tag);
         } catch (Exception e) {
           String rowString = null;
           try {

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java Mon Mar 23 22:02:13 2015
@@ -32,11 +32,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -62,7 +61,8 @@ public class HashTableLoader implements
   private MapJoinDesc desc;
 
   @Override
-  public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
+  public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
+      MapJoinOperator joinOp) {
     this.context = context;
     this.hconf = hconf;
     this.joinOp = joinOp;

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Mon Mar 23 22:02:13 2015
@@ -404,7 +404,7 @@ public class MapredLocalTask extends Tas
         if (row == null) {
           break;
         }
-        forwardOp.processOp(row.o, 0);
+        forwardOp.process(row.o, 0);
       }
       forwardOp.flush();
     }
@@ -445,7 +445,7 @@ public class MapredLocalTask extends Tas
       Operator<? extends OperatorDesc> forwardOp = work.getAliasToWork().get(alias);
 
       // put the exe context into all the operators
-      forwardOp.setExecContext(execContext);
+      forwardOp.passExecContext(execContext);
       // All the operators need to be initialized before process
       FetchOperator fetchOp = entry.getValue();
       JobConf jobConf = fetchOpJobConfMap.get(fetchOp);



Mime
View raw message